2017-05-17 63 views
0

我開始一個駱駝卡夫卡消費者,它從主題獲取所有數據並移動到哈希映射。罐子版本駱駝卡夫卡是2.18.1駱駝卡夫卡動物園管理員例外

的cxfbeans.xml文件如下:

<route id="Test1" streamCache="true"> 
     <from uri="file:C:/data" /> 
     <split streaming="true"> 
      <tokenize token="\n" /> 
      <to uri="bean:proc1" /> 
      <to 
       uri="kafka:localhost:9092?topic=Checking&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;serializerClass=kafka.serializer.StringEncoder" /> 
     </split> 
    </route> 

如果我部署戰我收到以下異常:

原因:org.apache.camel.ResolveEndpointFailedException:無法解析端點:kafka:// localhost:9092?serializerClass = kafka.serializer.StringEncoder & topic =正在檢查& zookeeperHost = localhost & zookeeperPort = 2181由於:有2個參數無法在端點上設置。如果參數拼寫正確,並且它們是端點的屬性,請檢查uri。未知的參數= [{zookeeperHost = localhost,zookeeperPort = 2181}]

我嘗試通過刪除端口和zookeeper的主機,它被部署,但消費者沒有消耗和放置在處理器中。

任何人都可以幫助我解決這個問題。我降低了版本,但我需要在to標籤中指定反序列化程序類。

回答

1

使用camel-kafka時,< = 2.16和> = 2.17版本存在差異。

你使用的是適合2.16。

For> = 2.17,參見http://camel.apache.org/kafka.html 2.17或更新的部分。

有有一個例子:

from("direct:start").process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); 
         exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); 
         exchange.getIn().setHeader(KafkaConstants.KEY, "1"); 
        } 
       }).to("kafka:localhost:9092?topic=test"); 
相關問題