2014-04-16 11 views
2

我想運行WordCount示例與保證消息處理。WordCount與保證消息處理

有一個噴口

  1. WSpout - 發射具有MSGID隨機句子。

和兩個螺栓

  1. SplitSentence - 在字劈裂句和與錨定

  2. 字計數發射 - 打印字計數。

我想用下面的代碼實現的是,當一個句子的所有單詞都會完成。必須確認與該句子對應的噴口。

我承認與_collector.ack(元組)最後螺栓WordCount只。我看到奇怪的是,儘管ack()正在調用WordCount.execute(),對應WSpout.ack()沒有被調用。它在默認超時後總是失敗。

我真的不明白代碼有什麼問題。請幫我理解這個問題。 任何幫助表示讚賞。

下面是完整的代碼。

public class TestTopology { 

    public static class WSpout implements IRichSpout { 
     SpoutOutputCollector _collector; 
    Integer msgID = 0; 
    @Override 
    public void nextTuple() { 
     Random _rand = new Random(); 
     String[] sentences = new String[] { "There two things benefit", 
       " from Storms reliability capabilities", 
       "Specifying a link in the", 
       " tuple tree is " + "called anchoring", 
       " Anchoring is done at ", 
       "the same time you emit a " + "new tuple" }; 

     String message = sentences[_rand.nextInt(sentences.length)]; 
     _collector.emit(new Values(message), msgID); 
     System.out.println(msgID + " " + message); 

     msgID++; 
    } 
    @Override 
    public void open(Map conf, TopologyContext context, 
      SpoutOutputCollector collector) { 
     System.out.println("open"); 
     _collector = collector; 
    } 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("LINE")); 
    } 
    @Override 
    public void ack(Object msgID) { 
     System.out.println("ack ------------------- " + msgID); 

    } 
    @Override 
    public void fail(Object msgID) { 
     System.out.println("fail ----------------- " + msgID); 

    } 
    @Override 
    public void activate() { 
     // TODO Auto-generated method stub 
    } 
    @Override 
    public void close() { 

    } 
    @Override 
    public void deactivate() { 
     // TODO Auto-generated method stub 
    } 
    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     // TODO Auto-generated method stub 
     return null; 
    } 
} 

public static class SplitSentence extends BaseRichBolt { 
    OutputCollector _collector; 
    public void prepare(Map conf, TopologyContext context, 
      OutputCollector collector) { 
     _collector = collector; 
    } 

    public void execute(Tuple tuple) { 
     String sentence = tuple.getString(0); 
     for (String word : sentence.split(" ")) { 
      System.out.println(word); 
      _collector.emit(tuple, new Values(word)); 
     } 
     //_collector.ack(tuple); 
    } 

    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("word")); 
    } 
} 

public static class WordCount extends BaseBasicBolt { 
    Map<String, Integer> counts = new HashMap<String, Integer>(); 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println("WordCount MSGID : " + tuple.getMessageId()); 
     String word = tuple.getString(0); 
     Integer count = counts.get(word); 
     if (count == null) 
      count = 0; 
     count++; 
     System.out.println(word + " ===> " + count); 
     counts.put(word, count); 
     collector.emit(new Values(word, count)); 
    } 
    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("word", "count")); 
    } 

} 

public static void main(String[] args) throws Exception { 
    TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("spout", new WSpout(), 2); 
    builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
      "spout"); 
    builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split", 
      new Fields("word")); 
    Config conf = new Config(); 
    conf.setDebug(true); 

    if (args != null && args.length > 0) { 
     conf.setNumWorkers(1); 
     StormSubmitter.submitTopology(args[0], conf, 
       builder.createTopology()); 
    } else { 
     conf.setMaxTaskParallelism(3); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("word-count", conf, builder.createTopology()); 
     Thread.sleep(10000); 
     cluster.shutdown(); 
    } 
} 
} 
+0

但是,您不是在SplitSentence和WordCount中的任何地方。在這個例子中,你不想錯過什麼嗎?此外,WordCount發佈了一個元組,但在拓撲結構中WordCount螺栓之後沒有任何東西。 – macias

+0

嗨Micias,Code有最後一個螺栓WordCount,它基本上擴展了BaseBasicBolt類。每個touple將在execute()方法結束時自動得到確認。我的理解是正確的BaseBasicBolt .... – Sanjiv

+0

這是真的。說實話我沒有意識到它,從來沒有使用BaseBasicBolt。但是,SplitSentence擴展了BaseRichBolt,而不是BaseBasicBolt ...查看我的答案。 – macias

回答

1

字計數延伸BaseBasicBolt保證了元組在螺栓自動ACKED,就像你在您的評論說。但是,SplitSentence擴展了BaseRichBolt,它要求您手動確認元組。你不是acking,所以元組超時。

+0

非常感謝...它的工作。正如我在問題陳述中所說的,我最後只是在確認WordCount。我曾經想過,如果你正在錨定每一層,那麼只需確認生成的所有元組樹就可以在最後一層進行確認。添加_ack spliterBolt解決了這個問題。 – Sanjiv

+1

...是的,錨定並不意味着所有錨定的元組都立即被佔用。這意味着所有錨定元組都會在其中一個失敗時重複。 – macias