我在Storm中實現了一個接收來自RabbitMQ噴口消息(https://github.com/ppat/storm-rabbitmq)的消息。Apache Storm加入模式 - 至少一次
我必須在Storm中處理的每個事件都會作爲來自Rabbit的兩條消息到達,因此我在bolt上有一個fieldsGrouping,以便兩個消息到達同一個螺栓。
我的第一個我想辦法:
- 收到第一個元組,並在內存中保存的信息
- 確認的第一個元組
- 當第二元組到達從存儲器中取第一,併發出新的記錄從噴口錨定到第二個。
這個工作,但我可以放鬆消息,如果一名工人死亡,因爲我會確認第一個元組,然後纔得到第二個和處理。
我改變這對:
- 接收所述第一元組並將其保存在存儲器
- 當第二元組到達從存儲器中取出的第一,發射錨固到兩個輸入元組一個新的元組和ACK兩個輸入元組。
內存中高速緩存是一個Guava高速緩存,有時間到期,當Tuple因超時而被驅逐時,我將在拓撲中失敗(),以便它在後面進行重新處理。
這似乎工作,但是當我做了一些測試後,我得到了一個系統停止從Rabbit Queue獲取消息的情況。
隊列上的預取設置爲5,並且在7處用setMaxSpoutPending噴出。在Rabbit接口中,我看到5個Unacked消息。
在風暴日誌中,我看到相同的元組一次又一次從緩存中被逐出。
據我所知,問題是壺嘴只能取出5個消息,這些消息都是一對的第一部分。我可以增加預取,但不保證這不會在生產中發生。
所以我的問題是:如何在Storm中處理這些問題時實現聯接?