在我的拓撲結構中,我從Kafka隊列中讀取觸發器消息。在收到觸發消息時,我需要向螺栓發出大約4096條消息。在螺栓中,經過一些處理後,它將發佈到另一個Kafka隊列(另一個拓撲將在以後使用)。如何設置TOPOLOGY_MAX_SPOUT_PENDING參數
我試圖設置TOPOLOGY_MAX_SPOUT_PENDING
參數來限制消息進入螺栓的數量。但我認爲它沒有效果。是因爲我在一個nextTuple()
方法中發射所有元組嗎?如果是這樣,那麼應該怎麼辦?
在我的拓撲結構中,我從Kafka隊列中讀取觸發器消息。在收到觸發消息時,我需要向螺栓發出大約4096條消息。在螺栓中,經過一些處理後,它將發佈到另一個Kafka隊列(另一個拓撲將在以後使用)。如何設置TOPOLOGY_MAX_SPOUT_PENDING參數
我試圖設置TOPOLOGY_MAX_SPOUT_PENDING
參數來限制消息進入螺栓的數量。但我認爲它沒有效果。是因爲我在一個nextTuple()
方法中發射所有元組嗎?如果是這樣,那麼應該怎麼辦?
要使TOPOLOGY_MAX_SPOUT_PENDING您需要啓用容錯機制(即分配Spouts中的消息ID和Bolts中的錨定和確認)。此外,如果您每次調用Spout.nextTuple()
時發出多個元組,TOPOLOGY_MAX_SPOUT_PENDING將無法按預期工作。
實際上,由於其他原因,每個Spout.nextTuple()
調用發出的單個元組都不止一個元組(更多詳細信息,請參閱Why should I not loop or block in Spout.nextTuple()))。
感謝Matthias的回覆。我在這裏看到了這個問題,因爲我正在發送一個消息的所有4096元組。但是,這是我的用例要求我做的事情。 – user2989124
也許你可以將TOPOLOGY_MAX_SPOUT_PENDING設置爲1。這應該觸發'nextTuple()'的單個調用,並且不會發出第二個調用,直到您發出的所有4096個元組都被處理。 –
感謝Matthais的回覆。我會重新考慮我的拓撲設計。 – user2989124
如果你正在閱讀卡夫卡,你應該使用風靡一時的KafkaSpout
。不要試圖實施你自己的噴壺,相信我,我在生產中使用KafkaSpout,並且它工作得非常順利。每個Kafka消息只生成一個元組。
正如你可以this nice page from the manual看到,你可以設置topology.max.spout.pending
這樣的:
Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
的topology.max.spout.pending
每口設置,如果你有四個噴口,你會最大非完整的元組有內您的拓撲結構等於噴口數量* topology.max.spout.pending。
另一個提示是,你應該使用暴風雨UI來查看topology.max.spout.pending
是否設置正確。
記住topology.max.spout.pending
僅拓撲內沒有未處理的元組的數量,拓撲永遠不會停止使用來自卡夫卡的消息,至少在生產系統上。如果你想消費的4096批次您需要在你的螺栓上實現緩存邏輯,或者使用風暴以外的東西(微型批處理)。
感謝您的回覆。 – user2989124
你試過了什麼代碼? – enigma
我編輯了您的帖子以包含格式標籤,並修復了一些拼寫錯誤。問題越清楚,答案就越好! –
謝謝Wes Foster! – user2989124