我是Storm的新手。我正在將它用於大學項目。如何停止Storm中的元組處理並執行其他代碼
我創建了我的拓撲,連接到MySql數據庫的Spout和兩個Bolts。與噴口相連的第一個螺栓準備並去除元組中不需要的信息;第二,做元組的過濾。
我在本地模式下工作。
我的問題是: 爲什麼運行拓撲後,在我的控制檯中,我看到輸出像下面的行?
38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]
,我讀了這些行處理的最後一個元組後,被認爲是正常的。不是嗎?
如何在提交拓撲後運行其他代碼?例如,我想打印在第二個螺栓中完成的過濾結果,保存在HashMap中。 如果我將代碼放在包含submitTopology()方法的行之後,代碼將在元組完成之前運行。
第二個和最後一個問題是:爲什麼在風暴的每一個例子,我看到在噴
「的Thread.sleep(1000)」?
也許它與我的第一個問題有關。
我希望我的問題很清楚。 提前謝謝!
謝謝你的詳盡回覆,馬提亞! –
如果這回答你的問題,隨時接受和/或upvote我的答案。 –