2015-11-02 44 views
2

我在Storm中實現了一個接收來自RabbitMQ噴口消息(https://github.com/ppat/storm-rabbitmq)的消息。Apache Storm加入模式 - 至少一次

我必須在Storm中處理的每個事件都會作爲來自Rabbit的兩條消息到達,因此我在bolt上有一個fieldsGrouping,以便兩個消息到達同一個螺栓。

我的第一個我想辦法:

  1. 收到第一個元組,並在內存中保存的信息
  2. 確認的第一個元組
  3. 當第二元組到達從存儲器中取第一,併發出新的記錄從噴口錨定到第二個。

這個工作,但我可以放鬆消息,如果一名工人死亡,因爲我會確認第一個元組,然後纔得到第二個和處理。

我改變這對:

  1. 接收所述第一元組並將其保存在存儲器
  2. 當第二元組到達從存儲器中取出的第一,發射錨固到兩個輸入元組一個新的元組和ACK兩個輸入元組。

內存中高速緩存是一個Guava高速緩存,有時間到期,當Tuple因超時而被驅逐時,我將在拓撲中失敗(),以便它在後面進行重新處理。

這似乎工作,但是當我做了一些測試後,我得到了一個系統停止從Rabbit Queue獲取消息的情況。

隊列上的預取設置爲5,並且在7處用setMaxSpoutPending噴出。在Rabbit接口中,我看到5個Unacked消息。

在風暴日誌中,我看到相同的元組一次又一次從緩存中被逐出。

據我所知,問題是壺嘴只能取出5個消息,這些消息都是一對的第一部分。我可以增加預取,但不保證這不會在生產中發生。

所以我的問題是:如何在Storm中處理這些問題時實現聯接?

回答

1

風暴並沒有提供一個很好的解決方案......什麼,你會需要的是一個可靠的存儲該緩衝第一元組(即,有狀態的操作)。因此,您可以立即確認第一個元組,並在發生故障後恢復狀態。

  1. 據我所知,Trident支持一些狀態處理。但我從來沒有用過它。
  2. 作爲第二種選擇,您可以使用分佈式鍵值存儲(如Casandra)作爲緩衝區。當然,這將是一個手寫解決方案,即您需要自己編寫所有Casandra交互。
  3. 最後但並非最不重要的一點,您可以切換到支持Apache Flink等有狀態運算符的流處理系統。(免責聲明:我是Flink的提交者)
相關問題