我想運行WordCount示例與保證消息處理。WordCount與保證消息處理
有一個噴口
- WSpout - 發射具有MSGID隨機句子。
和兩個螺栓
SplitSentence - 在字劈裂句和與錨定
字計數發射 - 打印字計數。
我想用下面的代碼實現的是,當一個句子的所有單詞都會完成。必須確認與該句子對應的噴口。
我承認與_collector.ack(元組)最後螺栓WordCount只。我看到奇怪的是,儘管ack()正在調用WordCount.execute(),對應WSpout.ack()沒有被調用。它在默認超時後總是失敗。
我真的不明白代碼有什麼問題。請幫我理解這個問題。 任何幫助表示讚賞。
下面是完整的代碼。
public class TestTopology {
public static class WSpout implements IRichSpout {
SpoutOutputCollector _collector;
Integer msgID = 0;
@Override
public void nextTuple() {
Random _rand = new Random();
String[] sentences = new String[] { "There two things benefit",
" from Storms reliability capabilities",
"Specifying a link in the",
" tuple tree is " + "called anchoring",
" Anchoring is done at ",
"the same time you emit a " + "new tuple" };
String message = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(message), msgID);
System.out.println(msgID + " " + message);
msgID++;
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
System.out.println("open");
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("LINE"));
}
@Override
public void ack(Object msgID) {
System.out.println("ack ------------------- " + msgID);
}
@Override
public void fail(Object msgID) {
System.out.println("fail ----------------- " + msgID);
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void close() {
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
public static class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
System.out.println(word);
_collector.emit(tuple, new Values(word));
}
//_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("WordCount MSGID : " + tuple.getMessageId());
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
System.out.println(word + " ===> " + count);
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new WSpout(), 2);
builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
"spout");
builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split",
new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
但是,您不是在SplitSentence和WordCount中的任何地方。在這個例子中,你不想錯過什麼嗎?此外,WordCount發佈了一個元組,但在拓撲結構中WordCount螺栓之後沒有任何東西。 – macias
嗨Micias,Code有最後一個螺栓WordCount,它基本上擴展了BaseBasicBolt類。每個touple將在execute()方法結束時自動得到確認。我的理解是正確的BaseBasicBolt .... – Sanjiv
這是真的。說實話我沒有意識到它,從來沒有使用BaseBasicBolt。但是,SplitSentence擴展了BaseRichBolt,而不是BaseBasicBolt ...查看我的答案。 – macias