2016-10-26 45 views
0

現在我正在學習風暴的保證消息處理,並且被這部分中的一些概念弄糊塗了。風暴愛好者的混淆和保證的消息處理

爲了保證由噴口發出的消息得到完全處理,Storm使用acker來實現此目的。每當噴口發出一個元組時,acker會將初始化爲「0」的「ack val」賦值,以存儲元組樹的狀態。每當此元組的下游螺栓發出新元組或確認一個「舊」元組時,元組ID將與「ack val」異或。 acker只需要檢查「ack val」是否爲0或不知道元組已被完全處理。讓我們看看下面的代碼:

public class WordReader implements IRichSpout { 
    ... ... 
while((str = reader.readLine()) != null){ 
    this.collector.emit(new Values(str), str); 
    ... ... 
} 

上面的代碼片是從「風暴入門」教程的字數統計程序中的噴口。在發射方法中,第二個參數「str」是messageId。我對這個參數感到困惑: 1)據我所知,無論噴嘴或螺栓是否發出元組(即消息),Storm都有責任爲該消息分配64位messageId。那是對的嗎?或者這裏「str」只是這個消息的人類可讀的別名? 2)無論1)的答案如何,這裏「str」在兩個不同的消息中是相同的詞,因爲在文本文件中應該有許多重複的詞。如果這是真的,那麼Storm如何區分不同的消息?這個參數的含義是什麼? 3)在一些代碼塊,我看到一些噴口使用以下代碼來設置消息ID在噴發射方法:

public class RandomIntegerSpout extends BaseRichSpout { 
    private long msgId = 0; 
    collector.emit(new Values(..., ++msgId), msgId); 
} 

這非常接近我認爲應該是:消息ID應完全不同的消息。但是對於這個代碼片段,另一個困惑是:跨不同執行者的私有字段「msgId」會發生什麼?由於每個執行程序都將自己的msgId初始化爲0,因此不同執行程序中的消息將從0,1,2等依次命名。那麼Storm如何區分這些消息?

我是Storm的新手,所以也許這些問題太天真了。希望有人能幫我弄清楚。謝謝!

回答

0

關於消息ID是一般的:在內部它可能是一個64位值,但是這個64位值被計算爲來自在Spout內的emit()中提供的msgID對象的散列值。因此,您可以將任何對象作爲消息標識(兩個對象散列到相同值的概率接近於零)。

關於使用str:我認爲在這個例子中,str包含一行(而不是一個單詞),它很可能不是文檔包含兩次完全相同的行(如果沒有空行可能很多)。

關於作爲消息ID的計數器:對於你的觀察你是絕對正確的 - 如果多個噴嘴並行運行,這會導致消息ID衝突,並會破壞容錯。

如果你想「修復」計數器方法,每個計數器應該以不同的方式初始化(​​最好從1...#SpoutTasks)。您可以使用taskID(這是唯一的,可通過Spout.open()中提供的TopologyContext訪問)。基本上,您可以獲得所有並行噴口任務的所有taskID,對它們進行排序,併爲每個噴口任務分配其訂購號。此外,您需要通過「並行噴口數」而不是1來增加。

+0

謝謝。但我仍然對你答案中的前兩段感到困惑。 – acekiller

+0

對於第一段,它的64位值是從msgID計算出來的,那麼Bolt發出的消息怎麼樣?由於Bolt不會分配帶有msgID的消息。正如我想的那樣,Spout和Bolt中的64位消息值是隨機產生的。 – acekiller

+0

對於第二段,str確實是一行,但在這裏每一行都是一個單詞,所以跨行的這些單詞中有很多重複。因此,如果64位值從str散列(如第1段所述),則不同的消息可能具有相同的msgID以及64位值。我對嗎?謝謝! – acekiller