2014-02-24 98 views
3

我有一個拓撲問題。我嘗試解釋工作流... 我有一個每2分鐘發出500k個元組的源代碼,這些元組必須通過噴口讀取,並且像一個對象(我認爲是三叉戟中的一批)一樣進行處理。 之後,一個螺栓/功能/還有什麼?...必須附加一個時間戳並將元組保存到Redis中。在Redis上寫入的Trident或Storm拓撲結構

我試圖實現一個Trident拓撲結構,該函數使用Jedis對象(Redis庫for Java)將所有元組保存到Redis中,但是當我部署時,我在此對象上收到NotSerializable Exception。

我的問題是。我該如何實現一個在Redis上寫這批元組的函數?閱讀在網絡上我不能發現,從函數寫入任何例子或者Redis的使用三叉戟狀態對象的任何實例(可能是我使用它...)

我簡單的拓撲結構:

TridentTopology topology = new TridentTopology(); 
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379")); 

在此先感謝

+0

你的意思是,你試圖序列化一個Jedis實例?你不能那樣做,你應該使用JedisPool的特殊單例模式(Jedis真的不被推薦,如果你失去連接,它將永遠丟失,池會解決這個問題)。 – zenbeni

+0

好吧,我用池解決了Jedis序列化的問題,但我不明白我怎麼能用三叉戟來實現這樣的拓撲(真正的問題是缺少文檔和類似的例子)。 今天我改變了基本風暴拓撲類型(沒有三叉戟)的拓撲,它運行良好,但並不保證處理所有的元組像一個(按某種方式訂購)批次... – Eddyman

+1

無論如何感謝您的提示,一步一步我開始看到解決方案! :-P – Eddyman

回答

7

(回答有關一般狀態,因爲涉及到Redis的具體問題在其他意見似乎解決了),當我們記住風暴從分散(或「分區的讀取

在風暴數據庫更新的概念變得更加清晰「)數據源(通過Storm」spouts「),並行處理多個節點上的數據流,可選擇對這些數據流執行計算(稱爲」聚合「),並將結果保存到分佈式數據存儲區(稱爲」狀態「) 。聚集是一個非常廣泛的術語,意思就是「計算內容」:例如計算流中的最小值在Storm中可以看作是先前已知的最小值與當前在羣集某個節點中處理的新值的聚合。考慮到聚合和分區的概念,我們可以看一下Storm中的兩個主要基元,它們允許將某些東西保存在一個狀態:partitionPersist和persistentAggregate,第一個在每個集羣節點的級別上運行沒有與其他分區協調,感覺有點像通過DAO與數據庫交談,而第二個涉及到「重新分區」元組(即通過集羣重新分配它們,通常是一些groupby邏輯),做一些計算(一個「聚合」),然後再讀取/保存某些數據到數據庫,感覺有點像與HashMap而不是數據庫交談(Storm在此情況下將數據庫稱爲「MapState」,如果只有一個密鑰,則稱爲「快照」地圖)。

一兩件事心裏有就是恰好一次語義風暴不會被處理每個元組恰好一次實現:由於在定義每個元組的潛在的幾個讀/寫操作,這將是太脆了拓撲結構中,我們希望避免出於可伸縮性原因的兩階段提交,並且在大規模情況下,網絡分區變得更可能。相反,Storm通常會繼續重播這些元組,直到他確信它們已經完全成功處理至少一次。這與狀態更新的重要關係是,Storm爲我們提供了允許冪等狀態更新的原始(OpaqueMap),以便那些重放不會破壞以前存儲的數據。例如,如果我們正在總結數字[1,2,3,4,5],那麼保存在數據庫中的結果將始終爲15,即使它們在「總和」操作中重播和處理了幾次,這是由於某些原因暫時失敗。 OpaqueMap對用於在數據庫中保存數據的格式有輕微的影響。請注意,如果我們告訴Storm如此行事,那麼這些重播和不透明邏輯就會出現,但我們通常會這樣做。

如果您有興趣閱讀更多內容,我在這裏發佈了2篇博客文章。

http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/

最後一兩件事:通過上面的重播的東西,暗示,風暴在本質上是一個非常異步機制:我們通常有生產商在排隊系統中的事件後一些數據(E ,卡夫卡或0MQ)和Storm從那裏讀取。因此,根據問題中的建議從風暴中分配時間戳可能會或可能不會產生所需的效果:此時間戳將反映「最近成功的處理時間」,而不是數據攝取時間,當然它不會相同在重播元組的情況下。

+0

感謝您的詳細解釋,我嘗試在您的文章中閱讀更多內容。 – Eddyman

1

您是否嘗試過redis的三叉戟狀態。在github上有一個代碼,它已經做到了: https://github.com/kstyrc/trident-redis

讓我知道這是否回答你的問題。

+0

我可以試試,謝謝你的回答。 – Eddyman

相關問題