我正在使用KafkaSpout。請在下面找到測試程序。使用Kafka Spout的卡夫卡風暴整合
我正在使用Storm 0.8.1。 Multischeme類在Storm 0.8.2中出現。我將使用它。我只是想知道早期版本是如何通過實例化StringScheme()類來工作的?我在哪裏可以下載Kafka Spout的早期版本?但是我懷疑這會是一個正確的選擇,而不是Storm 0.8.2的工作。 ??? (困惑)
當我運行風暴集羣(即當我推我的拓撲)的代碼(如下所示)時,我得到以下錯誤(這發生在Scheme部分被評論否則當然我會得到編譯器錯誤爲類是不存在於0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
在下面給出你可能會發現spoutConfig.scheme =新StringScheme()的代碼;部分評論。我得到編譯器錯誤,如果我不評論該行,這是很自然的,因爲那裏沒有構造函數。另外當我實例化MultiScheme時,我得到錯誤,因爲我在0.8.1中沒有那個類。
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
我猜我不明白這個問題:如果你使用0.8.2,它是否正常工作?如果是這樣,爲什麼甚至試圖在0.8.1中運行:0.8.2將其替換爲一些錯誤修復和其他改進。 –