我有一個是從一個MQTT經紀人接收數據的拓撲結構,我希望有一個噴口這樣的表現:創建一個Apache風暴口發出的元組每X秒
發出批次元組(或每x秒一個字符串列表)。我如何實現這一目標?我讀了一些關於Storm Trident的文章,但是它的
IBatchSpout
似乎不允許我在特定的時間間隔內批量發送元組。如果沒有新數據進來,噴嘴應該做什麼?它不能阻止線程,因爲它是Storm的主線程,對吧?
我有一個是從一個MQTT經紀人接收數據的拓撲結構,我希望有一個噴口這樣的表現:創建一個Apache風暴口發出的元組每X秒
發出批次元組(或每x秒一個字符串列表)。我如何實現這一目標?我讀了一些關於Storm Trident的文章,但是它的IBatchSpout
似乎不允許我在特定的時間間隔內批量發送元組。
如果沒有新數據進來,噴嘴應該做什麼?它不能阻止線程,因爲它是Storm的主線程,對吧?
您可以實現自己的MQTT噴口。舉個例子,看看MongoSpout。
重要的部分是nextTuple
方法。
當這種方法被調用時,Storm正在請求Spout向輸出採集器發出 元組。 這種方法應該是非阻塞的,所以 如果Spout沒有元組發出,應該返回這個方法。 nextTuple,ack和fail都在噴口任務中的單個線程中緊密循環調用。當沒有元組發出時,它很有禮貌地在下一次短暫的時間內進行三次睡眠(如單個毫秒),以免浪費太多的CPU。
你不能一次等待指定的時間,但你可以實現nextTuple
,以便它偶爾發出一個元組。
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
請注意,我找到了一個開源的MQTT spout。它看起來沒有生產準備,但可以用它作爲起點。
除了Christian,我發現this implementation是Storm的MQTT客戶端。前面提到的鏈接還沒有開發。