2016-10-17 54 views
1

如何處理春季集成中未能生成到kafka的消息?春季集成kafka出站適配器錯誤句柄

我沒有看到'error-channel'是'int-kafka:outbound-channel-adapter'中的一個選項,想知道我應該在哪裏添加錯誤通道信息,以便我的ErrorHandler可以「生成失敗以kafka「類型的錯誤。 (包括所有類型的故障,配置,網絡等)

此外,inputToKafka是隊列通道,我應該在哪裏添加錯誤通道來處理潛在的隊列滿錯誤?

<int:gateway id="myGateway" 
      service-interface="someGateway" 
      default-request-channel="transformChannel" 
      error-channel="errorChannel" 
      default-reply-channel="replyChannel" 
      async-executor="MyThreadPoolTaskExecutor"/> 

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka"> 
    <bean class="Transformer"/> 
</int:transformer> 

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            kafka-template="template" 
            auto-startup="false" 
            channel="inputToKafka" 
            topic="foo" 
            message-key-expression="'bar'" 
            partition-id-expression="2"> 
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0" 
        task-executor="kafkaExecutor"/> 
</int-kafka:outbound-channel-adapter> 

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    .... 
</bean> 

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
    <constructor-arg> 
     <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
      <constructor-arg> 
       <map> 
        <entry key="bootstrap.servers" value="localhost:9092" /> 
        ... 
       </map> 
      </constructor-arg> 
     </bean> 
    </constructor-arg> 
</bean> 

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'> 
    <bean class="ErrorHandler"/> 
</int:service-activator> 

編輯

<property name="producerListener"> 
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/> 
</property> 

回答

1

下游流動的任何錯誤都會被髮送到error-channel網關上。但是,由於默認情況下kafka是異步的,因此您不會以這種方式獲取任何錯誤。您可以在出站適配器上設置sync=true,然後在出現問題時拋出異常。但是請記住,它會慢很多。

您可以通過在KafkaTemplate上添加ProducerListener來獲得異步例外。

+0

我應該簡單地將「編輯」部分添加到我的KafkaTemplate bean中嗎? (請參閱原始問題中的編輯。) – edi

+0

您需要對適配器進行子類化並在'onError()'(或兩者)中執行一些操作。如果你實現'onSuccessI()',你必須重寫'isInterestedInSuccess()'爲true。默認監聽器(LoggingProducerListener)只記錄錯誤。 –

+0

我看到的onError返回void,如何確保郵件去我的ErrorHandler在我原來的問題?或者任何其他方式來實現另一個errorHandler,它將信息發送回replyChannel – edi

相關問題