2013-06-04 72 views
10

我想寫一個拓撲執行以下操作:在一個簡單的聚集風暴拓撲分組

  1. 訂閱了一個Twitter的飼料的管口(基於關鍵字)
  2. 聚集螺栓是聚集了許多鳴叫的集合中(比如N),並將它們發送打印機螺栓
  3. 一個簡單的螺栓,在打印一次收集到控制檯。

在現實中,我想要做的一些收集更多的處理。

我本地測試它,看起來像它的工作。但是,我不確定是否正確設置了螺栓上的分組,以及在實際風暴集羣上部署時是否可以正常工作。我很感激,如果有人可以幫助審查這種拓撲結構,並建議任何錯誤,更改或改進。

謝謝。

這是我的拓撲結構是什麼樣子。

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

聚集博爾特

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

打印機博爾特

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

回答

4

從我可以看到它看起來不錯。雖然,魔鬼的細節。我不確定你的聚合器螺栓是做什麼的,但是如果它對傳遞給它的值做出任何假設,那麼你應該考慮適當的字段分組。當你使用默認的並行性提示1時,這可能沒有什麼太大的區別,但是如果你決定使用多個聚集實例進行擴展,你所做的隱式邏輯假設可能需要一個非隨機分組。

+0

我已經提供了上述聚集器螺栓的代碼(請參閱execute方法)。現在它正在等待,直到它累積了N個(在上面的例子中爲10個)消息並且一旦它有10個消息就將它們分開。順便說一句,我剛剛發現了一個我會修復的錯誤。一旦我發佈值,我需要清除緩存。因此,如果我需要使用多個聚合器,那麼需要進行哪些更改。 –

0

嗨,只要你想訂閱你會遇到的問題不止一個關鍵字。我建議你的噴口也發出用來過濾的原始關鍵字。

然後,而不是做shuffleGrouping我會做一個fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

這樣,你要確保一個關鍵字的結果落得同一螺栓每次。這樣你可以正確地計算聚合。如果您省略字段組Storm可以實例化任何數量的聚合螺栓,並將任何消息從噴口發送到聚合螺栓的任何實例,最終會出現錯誤的結果。