2014-01-21 17 views
10

我已經開始使用風暴,所以我創建使用this tutorial風暴脫粒機沒有得到確認

當我跑我的拓撲LocalCluster和都顯得精緻簡單的拓撲結構, 我的問題是,我不是在得到ACK元組,這意味着我的噴口ack從未被調用過。

我的代碼在下面 - 你知道爲什麼ack沒有被調用嗎?

所以我的拓撲這個樣子的

public StormTopology build() { 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
      helloWorldSpout, spoutParallelism); 

     HelloWorldBolt bolt = new HelloWorldBolt(); 

     builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
        bolt, boltParallelism) 
       .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); 
} 

我的噴嘴像這樣

public class HelloWorldSpout extends BaseRichSpout implements ISpout { 
    private SpoutOutputCollector collector; 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("int")); 
    } 

    public void open(Map conf, TopologyContext context, 
      SpoutOutputCollector collector) { 
     this.collector = collector; 
    } 

    private static Boolean flag = false; 
    public void nextTuple() { 
     Utils.sleep(5000); 

      //emit only 1 tuple - for testing 
     if (!flag){ 
      this.collector.emit(new Values(6)); 
      flag = true; 
     } 
    } 

    @Override 
    public void ack(Object msgId) { 
     System.out.println("[HelloWorldSpout] ack on msgId" + msgId); 
    } 

    public void fail(Object msgId){ 
     System.out.println("[HelloWorldSpout] fail on msgId" + msgId); 
    } 
} 

和我的螺栓看起來像在這

@SuppressWarnings("serial") 
public class HelloWorldBolt extends BaseRichBolt{ 
    private OutputCollector collector; 

    public void prepare(Map conf, TopologyContext context, 
        OutputCollector collector) { 
     this.collector = collector; 
     logger.info("preparing HelloWorldBolt"); 
    } 

    public void execute(Tuple tuple) { 
     System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); 
     this.collector.ack(tuple); 
    } 

    public void cleanup() { 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 

    } 
} 
+2

+1我真的很喜歡在洗牌分組「()HelloWorldSpout.class.getSimpleName」你的格局。我不明白爲什麼這麼多java apis依賴於魔術字符串和魔術數字(而不是枚舉),但是你的模式是一個不被燒燬的好方法。 –

回答

15

你EMIT()方法噴口只有一個參數,所以元組不會被錨定。這就是爲什麼即使你正在使用螺栓中的元組,也不會回調ack()方法。

爲了使這個工作,你需要修改你的噴口發出第二個參數,這是消息ID。正是這種ID則傳遞迴ACK()方法在噴口:

public void nextTuple() { 
    Utils.sleep(5000); 

     //emit only 1 tuple - for testing 
    if (!flag){ 
     Object msgId = "ID 6"; // this can be any object 
     this.collector.emit(new Values(6), msgId); 
     flag = true; 
    } 
} 


@Override 
public void ack(Object msgId) { 
    // msgId should be "ID 6" 
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId); 
} 
+0

工作正常! 10倍 – Mzf