我想寫一個拓撲執行以下操作:在一個簡單的聚集風暴拓撲分組
- 訂閱了一個Twitter的飼料的管口(基於關鍵字)
- 聚集螺栓是聚集了許多鳴叫的集合中(比如N),並將它們發送打印機螺栓
- 一個簡單的螺栓,在打印一次收集到控制檯。
在現實中,我想要做的一些收集更多的處理。
我本地測試它,看起來像它的工作。但是,我不確定是否正確設置了螺栓上的分組,以及在實際風暴集羣上部署時是否可以正常工作。我很感激,如果有人可以幫助審查這種拓撲結構,並建議任何錯誤,更改或改進。
謝謝。
這是我的拓撲結構是什麼樣子。
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
聚集博爾特
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
打印機博爾特
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
我已經提供了上述聚集器螺栓的代碼(請參閱execute方法)。現在它正在等待,直到它累積了N個(在上面的例子中爲10個)消息並且一旦它有10個消息就將它們分開。順便說一句,我剛剛發現了一個我會修復的錯誤。一旦我發佈值,我需要清除緩存。因此,如果我需要使用多個聚合器,那麼需要進行哪些更改。 –