2017-02-11 259 views
1

我是相當新的風暴,最近改變了我的螺栓從IRichBolt 而不是BaseBasicBolt繼承,這意味着我現在負責ACKING的和失敗 一個元組根據我自己的邏輯。風暴拓撲:正確的方法ACK當兩個螺栓有相同的源螺栓

My topology looks like this: 螺栓A向螺栓B和C發出相同的元組,每個堅持數據到卡桑德拉。 這些操作不是冪等的,並且包含對兩個不同計數器列族的更新。 我只對失敗的元組感興趣,並在Cassandra的某些異常(不是讀/寫超時,只有QueryConsistency或Validation異常)中重播它。 問題是,如果螺栓B發生故障,相同的元組將從噴口重播並再次發射到螺栓C,螺栓C已經成功地保留其數據,從而創建錯誤的數據。

我試着瞭解acking是如何完成的(來自閱讀:http://www.slideshare.net/andreaiacono/storm-44638254),但未能理解 在上述情況中發生了什麼。

我想要正確解決這個問題的唯一方法是用相同的輸入源創建另一個噴口:噴口1 - >螺栓A - >螺栓B,噴口1' - >螺栓A' - >螺栓C'或者將兩個列族的數據保存在Bolts B和C中完成的同一個批處理語句中,方法是將它們合併爲一個。

這是正確的還是我錯過了什麼?還有另一種可能的解決方案來正確地確認這些元組嗎?

謝謝。

回答

0

你沒有說你想等待多長時間來重試一次螺栓B或C中的失敗更新,但不是完全失敗螺栓B中的元組,你可以添加更多的流。將螺栓B的scorpion-tail輸出流添加回同一螺栓B.如果螺栓B中的更新失敗,則將該元組寫入scorpion-tail輸出流,以便它再次作爲輸入返回到螺栓B,從第二流。您可以豐富元組來保存一個時間戳,這樣您的新流的螺栓B上的處理邏輯可以查看上次嘗試的時間,如果沒有足夠的時間,可以再次將它寫出到蠍尾流。當然,你也可以爲螺栓C做同樣的事情。

如果你想等很長時間重試元組(長時間處於Storm風格),你可以用Kafka主題替換那些scorpion-tail流以及必要的噴口。

+0

感謝您的回覆。喜歡你的想法,但我想我寧願讓噴嘴(在我的情況下是KafkaSpout)用它的ExponentialBackOff重試嘗試來處理失敗,而不是我必須在我的數據庫persister螺栓中實現這些功能。我將簡化拓撲樹,以便沒有兩個數據庫的螺栓具有相同的「父」螺栓,將上述示例中的螺栓連接到具有批處理語句的一個螺栓中,該批處理語句寫入兩個列族。 – fncontroloptioncommand