2

使用kafka組件爲Apache Camel(版本2.19.1)發生錯誤,我只是試圖在主題中打印傳入消息,我的管道是這樣構成的:Camel-Kafka組件無法正常工作:「因爲經紀人必須配置」

... 
context.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("kafka://localhost:9092?topic=test&groupId=testing") 
       .to("stream:out"); 
    context.start(); 
    } 
} 

試圖與和端點,而無需 「//」。

我得到的是:

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka://localhost:9092?topic=test&groupI... because of Brokers must be configured 
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:147) 
at org.apache.camel.impl.DefaultCamelContext.doWarmUpRoutes(DefaultCamelContext.java:3762) 
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3669) 
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3455) 
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3309) 
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:202) 
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3093) 
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3089) 
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3112) 
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3089) 
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61) 
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3026) 
at org.apache.camel.MainApp.main(MainApp.java:60) 
Caused by: java.lang.IllegalArgumentException: Brokers must be configured 
at org.apache.camel.component.kafka.KafkaConsumer.<init>(KafkaConsumer.java:62) 
at org.apache.camel.component.kafka.KafkaEndpoint.createConsumer(KafkaEndpoint.java:76) 
at org.apache.camel.impl.EventDrivenConsumerRoute.addServices(EventDrivenConsumerRoute.java:69) 
at org.apache.camel.impl.DefaultRoute.onStartingServices(DefaultRoute.java:103) 
at org.apache.camel.impl.RouteService.doWarmUp(RouteService.java:172) 
at org.apache.camel.impl.RouteService.warmUp(RouteService.java:145) 
... 12 more 

Process finished with exit code 1 

我想弄明白,但我真的不明白出了什麼問題,我的卡夫卡集羣是一個經紀人,一切都正常運行(動物園管理員和服務器),ty尋求幫助

回答

1

brokers=localhost:9092添加到消費者uri。

+0

現在我沒有收到任何錯誤,但消費者並沒有熬夜,它退出代碼0,但它應該保持直到明確的context.stop()等待傳入消息,爲什麼? – Giuseppe

+1

不,默認情況下(如果你有一個獨立的駱駝應用程序)它不應該。有保持CamelContext運行的解決方案,請點擊此處:http://camel.apache.org/running-camel-standalone-and-have-it-keep-running.html – mgyongyosi

+0

可以使用@mgyongyosi提到的Camel Main class – Gautam

0

看着this example該URL的第一部分是topic,然後作爲參數您可以通過brokers。所以官方文件似乎對我有點誤導。

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" 
        + "&maxPollRecords={{consumer.maxPollRecords}}" 
        + "&consumersCount={{consumer.consumersCount}}" 
        + "&seekTo={{consumer.seekTo}}" 
        + "&groupId={{consumer.group}}") 
        .routeId("FromKafka") 
       .log("${body}"); 

但作爲一般的建議是:駱駝open source,讓你隨時可以看看代碼和例子,在github上。您還可以找到您在那裏發佈的堆棧跟蹤的那些行,然後追蹤丟失的內容。

+0

與這種結構我不''有任何錯誤,但消費者並沒有熬夜等待消息,但立即退出代碼0 – Giuseppe

+0

你不需要一個民意調查的消費者? http://camel.apache.org/polling-consumer.html –

+0

我使用了RabbitMQ組件,並且我只需要終端信息,繼續保持並收聽傳入流,而我過去使用過的其他組件也一樣,它在組件本身中有一個默認的輪詢時間間隔,所以我認爲它不是必需的。 – Giuseppe

相關問題