2014-10-27 86 views
4

我有一個是從一個MQTT經紀人接收數據的拓撲結構,我希望有一個噴口這樣的表現:創建一個Apache風暴口發出的元組每X秒

  1. 發出批次元組(或每x秒一個字符串列表)。我如何實現這一目標?我讀了一些關於Storm Trident的文章,但是它的IBatchSpout似乎不允許我在特定的時間間隔內批量發送元組。

  2. 如果沒有新數據進來,噴嘴應該做什麼?它不能阻止線程,因爲它是Storm的主線程,對吧?

回答

2

您可以實現自己的MQTT噴口。舉個例子,看看MongoSpout

重要的部分是nextTuple方法。

當這種方法被調用時,Storm正在請求Spout向輸出採集器發出 元組。 這種方法應該是非阻塞的,所以 如果Spout沒有元組發出,應該返回這個方法。 nextTuple,ack和fail都在噴口任務中的單個線程中緊密循環調用。當沒有元組發出時,它很有禮貌地在下一次短暫的時間內進行三次睡眠(如單個毫秒),以免浪費太多的CPU。

你不能一次等待指定的時間,但你可以實現nextTuple,以便它偶爾發出一個元組。

private static final EMISSION_PERIOD = 2000; // 2 seconds 
private long lastEmission; 

@Override 
public void nextTuple() { 
    if (lastEmission == null || 
      lastEmission + EMISSION_PERIOD >= System.currentMillis()) { 
     List<Object> tuple = pollMQTT(); 
     if (tuple != null) { 
      this.collector.emit(tuple); 
      return; 
     } 
    } 
    Utils.sleep(50); 
} 

請注意,我找到了一個開源的MQTT spout。它看起來沒有生產準備,但可以用它作爲起點。

1

除了Christian,我發現this implementation是Storm的MQTT客戶端。前面提到的鏈接還沒有開發。