2017-07-11 54 views
0

我已經問過類似的問題,並有很好的迴應..但在這裏我的要求似乎足夠不同,足以分開問。創建一個自定義原子批量聚合器

駱駝聚合器,就像它真棒一樣,不會爲我剪下它。我需要彙總交換數據,當我達到一定的大小時,將其轉發到隊列中。當發生這種情況時,我可以將原始源消息從隊列中確認出來。基於環境原因,聚合器的持久性選擇不是一個真正的選擇。沒有rdms,其他選項將是本地管理狀態。如果路線發生故障或箱子,我需要能夠繼續處理,如果我在那個數據庫中有垃圾,那麼這是一個恢復工作。謝謝ZK和駱駝的整合!

我基本上認爲我需要實現一個處理器/或一個bean(有什麼細微的差別?),這將需要交換並將它們放在地圖中。當我在連接交換機上向前端點發送大小時,然後以某種方式確認所有消息。

我想知道的是我用什麼api來控制交換,以便有效地停止交易,並拉出我以後需要的答案。

任何人都可以提供一些指導並指向我感興趣的對象的相關功能?

我對此有一個很好的簡單想法。我打算擴展Rabbit *類,特別是RabbitConsumer doHandleDelivery,並且這些都是我的點頭聚合。一旦聚合完成,這將稱爲Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);。根據consumer.getProcessor().process(exchange);的結果,它會查看或重新發送所有消息。表面上看,我會說這一切都會很好。好吧,我需要在RabbitConsumer同步。

+0

擴展的問題與此建議的解決方案 – user2871054

回答

0

只是爲了窺視更新,我建立了我自己的批處理rmq消費者。

真的很簡單,但只是必須確保我建立onXXX函數,以便路徑可以暫停/恢復停止和啓動。