2015-12-15 81 views
1

我正在使用Apache Camel從kafka主題使用消息,然後處理消息,在處理髮生異常的情況下,我將該消息重定向到另一個kafka主題,並在單獨的路線。所以我有一個類似於下面的路線。從kafka到錯誤的另一個kafka主題的Apache駱駝路由

from(「kafka1」)。process(「someProcessor」)。end(); (交換 - > {exchange.getIn()。setBody(「Message with error details」)})。(「kafka2」);

上面的代碼實際上是在相同的kafka(kafka1)中發送錯誤消息。

我通過在onException進程中設置exchange.getIn()。setHeader(KafkaConstants.TOPIC,「kafka2」))來解決這個問題。這是預期的行爲?爲什麼它會忽略kafka2並改用kafka1?駱駝的

1)版本使用 - 2.14.0

2)卡夫卡端點網址 -

消費者爲

from("kafka:" + ("kafka.broker") + "?topic=" 
      + ("offer.kafka.topic") 
      + "&zookeeperHost=" + ("kafka.zookeeper.host") 
      + "&zookeeperPort=" + ("kafka.zookeeper.port") 
      + "&groupId=" + ("offer.kafka.group.id") 
      + "&consumerStreams=" + ("kafka.streams") 
      + "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals") 
      + "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout") 
      + "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries") 
      + "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms") 
      + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
      + "&autoOffsetReset=" + ("kafka.auto.offset.reset") 
      + "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes") 
      + "&socketReceiveBufferBytes=" + ("receive.buffer.bytes")) 
      .routeId("offerEventRoute").to("direct:offerEventRoute"); 

生產者 -

to("kafka:" + ("error.kafka.broker") + "?topic=" 
         + ("error.kafka.topic") 
         + "&zookeeperHost=" + ("error.kafka.zookeeper.host") 
         + "&zookeeperPort=" + ("error.kafka.zookeeper.port") 
         + "&groupId=" + ("error.kafka.group.id") 
         + "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout") 
         + "&rebalanceMaxRetries=" + ("rebalance.max.retries") 
         + "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms") 
         + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
         + "&autoOffsetReset=" + ("auto.offset.reset") 
         + "&messageSendMaxRetries=" + ("error.max.retries") 
         + "&serializerClass=kafka.serializer.StringEncoder" 
     ); 

回答

0

能否請您提供有關代碼的更多細節,如

1)使用的駱駝版本

2)您的Kafka端點URL。

以任何機會,你在你的端點URL使用「bridgeEndpoint」屬性..

+0

謝謝Himanshu,我更新了描述中的所有細節 – Amit

1

您需要設置bridgeEndPoint到真正在你的生產卡夫卡終點。否則,它會在交換標題中查找主題名稱,並將其用作生產者的主題名稱。

默認情況下它是假的。