2016-11-20 77 views
0

我需要找到每個RDD的最大密鑰,但使用reduce()時,我能得到的是整個Dstream中最大的一個。例如,在這個流中,我想要返回的是(2,「b」),(2,「d」),(3,「f」),但是我只能得到(3,「f」 )通過reduce(max) 我怎樣才能得到(2,「b」),(2,「d」),(3,「f」)?我可以獲得DStream中每個RDD的最大密鑰嗎?

sc = SparkContext(appName="PythonStreamingQueueStream") 
ssc = StreamingContext(sc, 1) 
stream = ssc.queueStream([sc.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"), 
(1,"e"),(3,"f")],3)]) 

stream.reduce(max).pprint() 
ssc.start() 
ssc.stop(stopSparkContext=True, stopGraceFully=True) 
+0

只有一個'RDD'流中.... – 2016-11-20 13:20:51

+0

我很抱歉,但我分割數據成3部分由'sc.parallelize' –

+0

否...請閱讀'minPartitions'正在做什麼:)通過3個RDD,每個批次都得到'max'。 – 2016-11-20 13:28:00

回答

0

此:

stream = ssc.queueStream([sc.parallelize([(1,"a"), (2,"b"),(1,"c"),(2,"d"), 
    (1,"e"),(3,"f")],3)]) 

只有一個批次,其中第一和唯一的一批具有(最小)3個分區創建一個流。我想你想:

stream = ssc.queueStream([ 
    sc.parallelize([(1,"a"), (2,"b")]), 
    sc.parallelize([(1,"c"), (2,"d")]), 
    sc.parallelize([(1,"e"), (3,"f")]), 
]) 

,這將給你預期的結果有:

stream.reduce(max).pprint() 
+0

非常感謝,我還有其他問題。在我的代碼中,我將數據分成3個分區,如果我使用'1'而不是'3',結果將是相同的。那麼轉型的「並行化」意味着什麼?節省我們的處理時間或其他東西? –

+0

用'max'減少''的結果將不取決於分區的數量。 'parallelize'創建RDD。 'minPartitions'設置此RDD的分區數量。 – 2016-11-21 08:58:39

+0

抱歉,我仍然對分區的含義感到困惑。更多的分區可以節省更多的操作時間或者只是將大數據分成小數據以幫助分配更多的計算機? –

相關問題