10
我已經開始使用風暴,所以我創建使用this tutorial風暴脫粒機沒有得到確認
當我跑我的拓撲LocalCluster
和都顯得精緻簡單的拓撲結構, 我的問題是,我不是在得到ACK元組,這意味着我的噴口ack
從未被調用過。
我的代碼在下面 - 你知道爲什麼ack
沒有被調用嗎?
所以我的拓撲這個樣子的
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
我的噴嘴像這樣
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}
@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}
和我的螺栓看起來像在這
@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}
public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
+1我真的很喜歡在洗牌分組「()HelloWorldSpout.class.getSimpleName」你的格局。我不明白爲什麼這麼多java apis依賴於魔術字符串和魔術數字(而不是枚舉),但是你的模式是一個不被燒燬的好方法。 –