2014-03-05 43 views
1

我正在Storm上開發一些數據分析算法,並對Storm的內部設計有一些疑問。我想模擬Storm中的傳感器數據產生和處理,因此我使用Spout通過在Spout的nextTuple方法中設置睡眠方法,以恆定的時間間隔將傳感器數據推入後續的螺栓。但是從實驗結果來看,似乎噴口沒有按照指定的速率推送數據。在實驗中,系統中沒有瓶頸螺栓。關於風暴流處理噴口中nextTuple方法的問題

然後我檢查了一些關於Storm的ack和nextTuple方法的資料。現在我的疑問是,只有當前面的元組在ack方法中被完全處理和確認後才調用nextTuple方法?

如果這是真的,是否意味着我不能設置固定的時間間隔來發射數據?

Thx很多!

+0

我喜歡這個問題,但你可能有更好的運氣,要求它在暴風雨用戶郵件列表。不過,我在下面看到我可以給你的一些信息。 –

回答

0

我的經驗是,你不應該期望Storm做出任何實時保證,包括在你的情況下處理元組的速度。你當然可以編寫一個只在某個時間表上發出元組的噴口,但Storm並不能保證它總是隨時調用噴口。

請注意,只要有空間可用於拓撲中的更多掛起元組,就應該調用nextTuple。如果拓撲結構具有空閒容量,那麼我希望Storm可以儘可能地用它來填充它。

0

我也有類似的用例,和我的方式來實現它是通過使用TICK_TUPLE

Config tickConfig = new Config();          
tickConfig.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15); 
... 
... 
builder.setBolt("storage_bolt", new S3Bolt(), 4).fieldsGrouping("shuffle_bolt", new Fields("hash")).addConfigurations(tickConfig); 

然後我storage_bolt(注意:這是寫在python,但你會得到一個想法),我檢查消息tick_tuple如果是則執行我的代碼:

def process(self, tup): 
    if tup.stream == '__tick':            
     # Your logic that need to be executed every 15 seconds, 
     # or what ever you specified in tickConfig. 
     # NOTE: the maximum time is 600 s.            
     storm.ack(tup)              
     return