2015-12-05 69 views
0

我想在Scala/Spark中實現一個函數,該函數可以接受多個reducer/aggregators並在一次執行中執行它們。所以基本上我給了reduce函數和初始值,並且它應該在一次傳遞中創建一個複合reduce操作。如何在Scala中實現未知類型和未知數的參數

這裏的邏輯是什麼樣子的Python

from functools import reduce 

def reduce_at_once(data, reducer_funcs_inits): 
    reducer_funcs, inits = zip(*reducer_funcs_inits) 

    complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs)) 

    return list(reduce(complete_reducer_func, data, inits)) 

data = list(range(1, 20)) 
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum 
         (lambda acc, y: acc * y, 1) # product 
         ] 
print(list(reduce_at_once(data, reducer_funcs_inits))) 
# [190, 121645100408832000] 

我怎樣才能做這樣的事情在斯卡拉(星火)?這個問題似乎有一個列表,其長度我只知道什麼時候調用,而且列表中的元素可能有不同的類型(減少初始累加器),具體取決於我想包含哪個reducer(不一定只有這裏的數字)。

+0

您應該添加'python'標籤 –

回答

2

您可以隨時使用

def reduce_at_once(data: Any, reducer_funcs_inits: Any*) 

但這是很少你想要什麼。特別是,在這裏你真正需要

case class ReducerInit[A, B](f: (B, A) => B, init: B) 

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] 

不幸的是,實施reduce_at_once將是很醜陋:

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = { 
    val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any]) 
    val inits = rfis.map(_.init.asInstanceOf[Any]) 

    val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) } 

    data.foldLeft(inits)(crf) 
} 

檢查:

val data = 1 to 20 

val rf1 = ReducerInit[Int, Int](_ + _, 0) 
val rf2 = ReducerInit[Int, Int](_ * _, 1) 

println(reduce_at_once(data, rf1, rf2)) 

givesArrayBuffer(210, -2102132736)(注溢出)。