2015-06-17 44 views
1

火花流中是否還有數據保存排序後的dstream中的多個微批次的數據,其中流是使用時間戳排序的? (假設是單調到達的數據) 任何人都可以提出如何在迭代中保存數據的建議,其中每個迭代都是在JavaDStream中處理的RDD?在Spark Streaming中使用Java對有序Spark流進行迭代編程?

迭代是什麼意思?

我首先使用時間戳對流進行排序,假設數據已經到達單調增加的時間戳(沒有亂序)。

我需要一個全局HashMap X,我希望使用時間戳爲「t1」的值更新,然後是「t1 + 1」。由於X本身的狀態會影響計算,因此需要進行線性運算。因此,「t1 + 1」處的操作取決於HashMap X,它依賴於「t1」之前和之前的數據。

應用

這尤其是當一個人試圖更新模型或比較RDD的兩套,或保留某些事件等,這將在未來的迭代影響操作的全球歷史的情況下?

我想保留一些積累的歷史來計算..不是整個數據集,但堅持可以在未來的DStream RDD中使用的某些事件?

+0

你問的是如何堅持HDFS嗎? – jaco0646

+0

@ jaco0646我想保持它處於內存狀態,我可以在將來的迭代中使用它做出一些決定。例如,它可能使用foreachRDD做到這一點,但我不知道如何 – tsar2512

回答

1

UpdateStateByKey的確如此:它使您能夠定義某些狀態,並根據流中的每個RDD更新它。這是積累歷史計算的典型方法。

從DOC:

的updateStateByKey操作可以保持任意狀態,而源源不斷的新信息更新它。要使用此功能,您必須執行兩個步驟。

  1. 定義狀態 - 狀態可以是任意數據類型。
  2. 定義狀態更新函數 - 用函數指定如何使用前一狀態更新狀態以及輸入流中的新值。

此處瞭解詳情: https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#updatestatebykey-operation

如果不削減或你需要更大的靈活性,可以隨時存儲到一個key-value存儲明確像卡桑德拉(CF卡珊德拉連接器:https://github.com/datastax/spark-cassandra-connector) ,儘管這個選項通常比較慢,因爲它在每次查找時系統地涉及網絡傳輸。

+0

謝謝cassandra的建議。請看問題的迭代部分。我想迭代遍歷排序流中的對象,我可以在多個迭代中保存一些數據「狀態」。 – tsar2512

+0

請看我最新的問題。如果您有任何建議,請告知我們。 – tsar2512

+0

嗨。'UpdateStateByKey' _is_,在某種意義上,是一個全局的HashMap。全局HashMap的關鍵是updateStateByKey的關鍵。你所要做的就是根據該決定找出RDD的哪一行將影響「全局HashMap」=>調用'keyBy()'的哪個(key,value),然後調用updateStateByKey,同時提供更新函數基於之前內容的地圖內容和當前RDD中的數據。這種方法適合你的問題嗎? – Svend