2016-07-15 120 views
2

我正在嘗試使用pyspark將我的reduceByKey函數的輸出作爲相對於該鍵傳遞的整數的範圍。使用spark在一個操作中找到使用spark的reduceByKey的值範圍

我嘗試做一個自定義函數:

def _range(x,y): 
    return [max(x,y), min(x,y)] 


data2 = data_.map(lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp']))) 
     .reduceByKey(lambda x,y: _range(x,y)) 
當然

輸出出來的名單列表中列出

我知道內部的解決辦法是

.reduceByKey(max) 

其次

.reduceByKey(min) 

^^^^然後結合他們,但我不想執行兩個操作

但我想這樣做在一個傳遞所以應用程序並不低效。我也想避免首先填充整數列表。 有什麼想法?數據在RDD中。 感謝

回答

0

這裏combineByKey定義爲一個正確的做法如下:

def seq_op(acc, x): 
    return (min(x, acc[0]), max(x, acc[1])) 

def comb_op(acc1, acc2): 
    return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1])) 

(pairs 
    .aggregateByKey((sys.float_info.max, sys.float_info.min), seq_op, comb_op) 
    .mapValues(lambda minmax: abs(minmax[0] - minmax[1]))) 

其中pairs是的結果:

pairs = data_.map(
    lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp'] 
) 

由於關鍵是動態生成的,你無法避免初始map這是因爲關鍵應預先知道任何*byKey操作。值類型轉換可以在combineByKey之內執行,但基本上不會影響數據必須訪問的次數。