1
如何處理春季集成中未能生成到kafka的消息?春季集成kafka出站適配器錯誤句柄
我沒有看到'error-channel'是'int-kafka:outbound-channel-adapter'中的一個選項,想知道我應該在哪裏添加錯誤通道信息,以便我的ErrorHandler可以「生成失敗以kafka「類型的錯誤。 (包括所有類型的故障,配置,網絡等)
此外,inputToKafka是隊列通道,我應該在哪裏添加錯誤通道來處理潛在的隊列滿錯誤?
<int:gateway id="myGateway"
service-interface="someGateway"
default-request-channel="transformChannel"
error-channel="errorChannel"
default-reply-channel="replyChannel"
async-executor="MyThreadPoolTaskExecutor"/>
<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
<bean class="Transformer"/>
</int:transformer>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2">
<int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>
<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
....
</bean>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
<bean class="ErrorHandler"/>
</int:service-activator>
編輯
<property name="producerListener">
<bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>
我應該簡單地將「編輯」部分添加到我的KafkaTemplate bean中嗎? (請參閱原始問題中的編輯。) – edi
您需要對適配器進行子類化並在'onError()'(或兩者)中執行一些操作。如果你實現'onSuccessI()',你必須重寫'isInterestedInSuccess()'爲true。默認監聽器(LoggingProducerListener)只記錄錯誤。 –
我看到的onError返回void,如何確保郵件去我的ErrorHandler在我原來的問題?或者任何其他方式來實現另一個errorHandler,它將信息發送回replyChannel – edi