2016-09-29 26 views
0

我有數據已經​​通過鍵入我的Spark Streaming分區由於Kafka,即在一個節點上找到的鍵沒有在任何其他節點上找到。如何僅在Spark Streaming中的分區內「減少」,可能使用combineByKey?

我想用的Redis及其incrby(由增量)命令作爲一個國家的發動機和減少發送到Redis的請求的數量,我想通過這樣一個詞來部分地減少我的數據的每個工作節點指望通過它自己。 (關鍵是標記+時間戳來從字數獲得我的功能)。 我想避免混洗,並讓redis負責在工作節點間添加數據。

即使我已籤數據乾淨地工作節點,.reduce(_ + _)(斯卡拉語法)中拆分需要很長的時間(幾秒鐘與亞秒級的地圖任務),作爲HashPartitioner似乎打亂我的數據到隨機節點添加它。

如何在不使用Spark Streaming的Scala中觸發混洗步驟的情況下,如何在每個分區器上編寫簡單的字數統計減少?

注意DStream對象缺少一些RDD方法,它們只能通過transform方法獲得。

看來我可能能夠使用combineByKey。我想跳過mergeCombiners()這一步,而是把它們所在的集合元組留在那裏。 書「學習星火」神祕地說:

我們可以在combineByKey()禁用地圖端的聚合,如果我們知道,我們的數據將不會從中受益。例如,groupByKey()禁用地圖側聚合,因爲聚合函數(附加到列表)不保存任何空間。如果我們想禁用地圖邊組合,我們需要指定分區器;現在你可以通過傳遞rdd.partitioner來使用源RDD上的分區器。

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

該書然後繼續提供沒有語法如何做到這一點,也沒有我曾與谷歌的任何運氣至今。

更糟糕的是,據我所知,在Spark Streaming中沒有爲DStream RDD設置分區器,所以我不知道如何提供一個分區器來組合並且不會結束數據混洗。

另外,「地圖方」究竟是什麼意思,mapSideCombine = false究竟有什麼後果呢?

combineByKey Scala實現可以在 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 查找combineByKeyWithClassTag找到。

如果解決方案涉及自定義分區程序,請包含代碼示例以瞭解如何將該分區程序應用於傳入DStream。

+0

當從Kafka讀取一個'DStream'時,卡夫卡分區和Spark分區之間有一個1:1的記者。但是,並不能保證同一個工作節點總是會讀取同一個分區,這意味着不能保證所有的密鑰最終都會在一個Spark工作節點上。因此,你確實需要洗牌數據。 –

+0

感謝您的評論。我計劃使用redis和它的增量命令'incrby'作爲狀態引擎,所以這不會成爲問題,如果我只能減少每個工作節點上的內容,它仍然有助於減少發送到redis的消息。我怎樣才能做到這一點? – Andreas

回答

0

這可以通過使用mapPartitions來完成,該函數將一個分區上的輸入RDD的迭代器映射到輸出RDD上的迭代器。

要實現一個字計數,我映射到_._2以除去卡夫卡鍵,然後執行使用foldLeft,初始化mutable.hashMap,然後把它轉換成一個Iterator以形成輸出RDD快速迭代字數。

val myDstream = messages 
    .mapPartitions(it => 
    it.map(_._2) 
    .foldLeft(new mutable.HashMap[String, Int])(
     (count, key) => count += (key -> (count.getOrElse(key, 0) + 1)) 
    ).toIterator 
) 
相關問題