2014-02-25 48 views
0

我很難理解如何爲風暴提供數值,因爲我是風暴的新手。如何爲風暴計算提供數值

我從入門套件開始。我通過TestWordSpout並在下面的代碼提供了新的價值

public void nextTuple() { 
    Utils.sleep(100); 
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; 
    final Random rand = new Random(); 
    final String word = words[rand.nextInt(words.length)]; 
    _collector.emit(new Values(word)); 
} 

所以我看到它採取一個字在同一時間_collector.emit(new Values(word));

如何,我可以提供的話directly.Is這可能集合去?

TestWordSpout.java

我的意思時nextTuple被稱爲新詞是隨機從列表中選擇和發射。隨機列表可以這個樣子一定的時間間隔後

@100ms: nathan 
@200ms: golda 
@300ms: golda 
@400ms: jackson 
@500ms: mike 
@600ms: nathan 
@700ms: bertels 

如果我已經有這個名單的集合,只是把它喂風暴。

+0

你是什麼意思? – Chiron

+0

@Chiron:我更新了... –

回答

0

「值」類型接受任何類型的對象和任何數字。

所以,你可以簡單地傳遞一個List例如,從博爾特的執行方法或從脫粒機的nextTuple方法:

List<String> words = new ArrayList<>(); 
words.add("one word"); 
words.add("another word"); 
_collector.emit(new Values(words)); 

您可以添加一個新的領域也是如此,只是一定要其聲明在declareOutputFields方法

_collector.emit(new Values(words, "a new field value!"); 

而在你declareOutputFields方法

@Override 
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declare(new Fields("collection", "newField")); 
} 

你可以在接下來的博爾特字段由execute方法給出的元組對象的拓撲結構:

List<String> collection = (List<String>) tuple.getValueByField("collection"); 
String newFieldValue = tuple.getStringByField("newField"); 
+0

在哪裏把這個代碼...在nextTuple或執行方法。 –

+0

nextTuple方法是根據您的需要發出值的地方...執行方法位於螺栓中,您可以實際處理通過噴口供給的數據 – user2720864

1

風暴的設計和製造,以處理數據的連續流。請參閱Rationale for the Storm。輸入數據不太可能輸入風暴集羣。一般來說,暴風雨的輸入數據來自JMS隊列,Apache Kafka或Twitter feeds等。我想,你想通過一些配置。在這種情況下,以下情況適用。考慮Storm的設計目的,可以將非常有限的配置細節傳遞給Storm,例如RDMBS連接細節(Oracle/DB2/MySQL等),JMS提供者詳細信息(IBM MQ/RabbitMQ等)或Apache Kafka細節/ Hbase等

對於特定的問題或提供上述產品的詳細配置信息,有我能想到

1三種方式。設置在噴水口或螺栓

的實例例如配置細節:聲​​明實例變量並分配值作爲噴出/螺栓構造如下

public class TestWordSpout extends BaseRichSpout { 
     List<String> listOfValues; 

    public TestWordSpout(List<String> listOfValues) { 
     this.listOfValues=listOfValues; 
    } 

} 

在拓撲提交類的一部分,創建脫粒機的實例與值的列表

 List<String> listOfValues=new ArrayList<String>(); 
     listOfValues.add("nathan"); 
     listOfValues.add("golda"); 
     listOfValues.add("mike"); 

     builder.setSpout("word", new TestWordSpout(listOfValues), 3); 

這些值可作爲nextTuple()方法實例變量

請在Storm contrib上查看關於RDBMS/Kafka等設置的風暴集成

2.在getComponentConfiguration()中設置配置。這種方法被用來覆蓋拓撲的配置,然而,可以在幾細節通過如下

@Override 
    public Map<String, Object> getComponentConfiguration() { 
     Map<String, Object> ret = new HashMap<String, Object>(); 
     if(!_isDistributed) { 
      ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); 
      return ret; 
     } else { 
      List<String> listOfValues=new ArrayList<String>(); 
      listOfValues.add("nathan"); 
      listOfValues.add("golda"); 
      listOfValues.add("mike"); 
      ret.put("listOfValues", listOfValues); 
     } 
     return ret; 
    }  

和配置的細節中分別噴/螺栓的open() or prepare()方法可用。

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     _collector = collector; 
     this.listOfValues=(List<String>)conf.get("listOfValues");   
     } 

3.聲明屬性文件中的配置並將它作爲提交到Storm羣集的jar文件的一部分進行jar。 Nimbus節點將jar文件複製到工作節點,並使其可用於執行程序線程。 open()/ prepare()方法可以讀取屬性文件並分配給實例變量。