2013-06-27 76 views
5

我正在使用KafkaSpout。請在下面找到測試程序。使用Kafka Spout的卡夫卡風暴整合

我正在使用Storm 0.8.1。 Multischeme類在Storm 0.8.2中出現。我將使用它。我只是想知道早期版本是如何通過實例化StringScheme()類來工作的?我在哪裏可以下載Kafka Spout的早期版本?但是我懷疑這會是一個正確的選擇,而不是Storm 0.8.2的工作。 ??? (困惑)

當我運行風暴集羣(即當我推我的拓撲)的代碼(如下所示)時,我得到以下錯誤(這發生在Scheme部分被評論否則當然我會得到編譯器錯誤爲類是不存在於0.8.1):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme 
     at storm.kafka.TestTopology.main(TestTopology.java:37) 
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme 

在下面給出你可能會發現spoutConfig.scheme =新StringScheme()的代碼;部分評論。我得到編譯器錯誤,如果我不評論該行,這是很自然的,因爲那裏沒有構造函數。另外當我實例化MultiScheme時,我得到錯誤,因爲我在0.8.1中沒有那個類。

public class TestTopology { 
    public static class PrinterBolt extends BaseBasicBolt { 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     } 

     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      System.out.println(tuple.toString()); 
     } 
    } 

    public static void main(String [] args) throws Exception { 
     List<HostPort> hosts = new ArrayList<HostPort>(); 
     hosts.add(new HostPort("127.0.0.1",9092)); 
     LocalCluster cluster = new LocalCluster(); 
     TopologyBuilder builder = new TopologyBuilder(); 
     SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID"); 
     spoutConfig.zkServers=ImmutableList.of("localhost"); 
     spoutConfig.zkPort=2181; 
     //spoutConfig.scheme=new StringScheme(); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     builder.setSpout("spout",new KafkaSpout(spoutConfig)); 
     builder.setBolt("printer", new PrinterBolt()) 
       .shuffleGrouping("spout"); 
     Config config = new Config(); 

     cluster.submitTopology("kafka-test", config, builder.createTopology()); 

     Thread.sleep(600000); 
    } 
+0

我猜我不明白這個問題:如果你使用0.8.2,它是否正常工作?如果是這樣,爲什麼甚至試圖在0.8.1中運行:0.8.2將其替換爲一些錯誤修復和其他改進。 –

回答

8

我有同樣的問題。最後解決它,我把完整的運行示例放在github上。

歡迎您來看看這裏> https://github.com/buildlackey/cep

(點擊的示例程序,應該讓你和運行風暴+卡夫卡目錄)。

+8

考慮在答案中加入一兩個句子來描述你所做的事情,以便你的答案是相關的,而不依賴於該Git存儲庫處於活動狀態。 – neontapir

+0

當然:該項目包含單元測試和示例程序,說明如何在Storm,Kafka和Esper上開發複雜事件處理(CEP)應用程序。 –

+0

對我來說聽起來不錯 –

5

我們有類似的問題。

我們的解決方案:

  1. 開放的pom.xml

  2. 變化範圍從如果你想知道更多關於依賴性範圍檢查Maven實況提供給<scope>compile</scope>

Maven docu - dependency scopes