2015-09-27 108 views
3

在我的拓撲結構中,我從Kafka隊列中讀取觸發器消息。在收到觸發消息時,我需要向螺栓發出大約4096條消息。在螺栓中,經過一些處理後,它將發佈到另一個Kafka隊列(另一個拓撲將在以後使用)。如何設置TOPOLOGY_MAX_SPOUT_PENDING參數

我試圖設置TOPOLOGY_MAX_SPOUT_PENDING參數來限制消息進入螺栓的數量。但我認爲它沒有效果。是因爲我在一個nextTuple()方法中發射所有元組嗎?如果是這樣,那麼應該怎麼辦?

+0

你試過了什麼代碼? – enigma

+0

我編輯了您的帖子以包含格式標籤,並修復了一些拼寫錯誤。問題越清楚,答案就越好! –

+0

謝謝Wes Foster! – user2989124

回答

1

要使TOPOLOGY_MAX_SPOUT_PENDING您需要啓用容錯機制(即分配Spouts中的消息ID和Bolts中的錨定和確認)。此外,如果您每次調用Spout.nextTuple()時發出多個元組,TOPOLOGY_MAX_SPOUT_PENDING將無法按預期工作。

實際上,由於其他原因,每個Spout.nextTuple()調用發出的單個元組都不止一個元組(更多詳細信息,請參閱Why should I not loop or block in Spout.nextTuple()))。

+0

感謝Matthias的回覆。我在這裏看到了這個問題,因爲我正在發送一個消息的所有4096元組。但是,這是我的用例要求我做的事情。 – user2989124

+0

也許你可以將TOPOLOGY_MAX_SPOUT_PENDING設置爲1。這應該觸發'nextTuple()'的單個調用,並且不會發出第二個調用,直到您發出的所有4096個元組都被處理。 –

+0

感謝Matthais的回覆。我會重新考慮我的拓撲設計。 – user2989124

2

如果你正在閱讀卡夫卡,你應該使用風靡一時的KafkaSpout。不要試圖實施你自己的噴壺,相信我,我在生產中使用KafkaSpout,並且它工作得非常順利。每個Kafka消息只生成一個元組。

正如你可以this nice page from the manual看到,你可以設置topology.max.spout.pending這樣的:

Config conf = new Config(); 
conf.setMaxSpoutPending(5000); 
StormSubmitter.submitTopology("mytopology", conf, topology); 

topology.max.spout.pending每口設置,如果你有四個噴口,你會最大非完整的元組有內您的拓撲結構等於噴口數量* topology.max.spout.pending。

另一個提示是,你應該使用暴風雨UI來查看topology.max.spout.pending是否設置正確。


記住topology.max.spout.pending僅拓撲內沒有未處理的元組的數量,拓撲永遠不會停止使用來自卡夫卡的消息,至少在生產系統上。如果你想消費的4096批次您需要在你的螺栓上實現緩存邏輯,或者使用風暴以外的東西(微型批處理)。

+0

感謝您的回覆。 – user2989124