2017-10-18 103 views
0

我知道SpoutConfig有retryLimit來設置消息可以重新處理的次數。如何知道Kafka-Storm中的重試次數

關於retryLimit,這是我在SpoutConfig.class找到的消息:

指數回退重試設置。這些被 ExponentialBackoffMsgRetryManager用於在螺栓 調用OutputCollector.fail()後重試消息。

我想知道是否有任何方法可以知道當我的代碼中的任何給定螺栓處理Tuple時重試的確切數量。

例如,如果我設置retryLimit=5和失敗(調用OutputCollector.fail())第一次,當它被重新處理,第二次我想知道這個元組已經失敗1次。

我將不勝感激您對此的幫助。

謝謝。

回答

1

這裏沒有內置的支持。由卡夫卡記錄生成的元組僅由https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java取決於卡夫卡記錄,而不取決於重放次數。

默認的RecordTranslator會將主題,分區和偏移量作爲元組的一部分發出,因此您可能能夠使用它們來檢查您的螺栓是否曾經看過元組(假設您有某種狀態存儲) 。爲什麼這些螺栓需要知道元組失敗了多少次?

編輯:
我認爲我們沒有添加失敗計數作爲發射元組選項的原因之一是它不可靠。由於元組失敗的次數只存在於內存中,所以可能會遇到元組失敗,噴口崩潰,以及您從不會看到失敗次數超過0的元組的情況。

即使我們有噴口中的持久狀態存儲仍然會有失敗的元組沒有標記的情況,例如如果噴口首先崩潰並且以前發射的元組失敗。重新啓動的噴口無法識別之前發出的元組,因此不會將其標記爲失敗。

在我看來,你實際需要追蹤的是噴口是否不止一次地發射了一個元組,而不是噴口是否認爲它以前失敗了。

您可能可以使用https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.javaonEmit來跟蹤哪些偏移量已被多次發射。由於它是作爲噴口的一部分運行的,因此當元組被佔用時清理狀態應該非常簡單。仍有可能遺漏失敗的元組,因爲onEmit在噴口發出元組之後運行,所以如果噴口在噴射後立即崩潰,則可能會錯過失敗。也許想想你是否可以先以某種方式設計這個需求。

+0

斯蒂格,謝謝你的回覆。在我的情況下,這些螺栓需要知道這個數字,因爲業務人員想要給出那些失敗的元組進行特殊處理(插入錯誤表中以重新處理它們)。 – cricardo84

+0

編輯的答案,因爲評論太大 –

+0

感謝您的回答Stig,我會試一試,看看會發生什麼。 – cricardo84