2016-11-05 77 views
1

如何捕捉寫入卡夫卡主題的錯誤?春雲流發送給卡夫卡錯誤控制處理

import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.cloud.stream.messaging.Processor; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.handler.annotation.SendTo; 
import org.springframework.stereotype.Component; 

import javax.xml.bind.JAXBException; 

@Component 
@EnableBinding(Processor.class) 
public class Parser { 

    @StreamListener(Processor.INPUT) 
    @SendTo(Processor.OUTPUT) 
    public String process(Message<?> input) { 

     // do smth 

     // how to catch an error if sending message to 'output' topic fails 
    } 

} 

將生產者切換到同步模式。

spring.cloud.stream.kafka.bindings.output.producer.sync=true 

下一步是什麼?任何例子?擴展一些綁定impl,添加一些AOP魔法?

回答

0

我想你需要的是註冊一個KafkaProducerListener,它處理你的情況下的錯誤情況,並使它在應用程序中作爲bean可用。

有關KafkaProducerListener更多信息是here

另外,還要注意平時的失敗的信息將可在errorChannel具有頻道名稱error。一旦您爲錯誤通道配置了destination名稱,即所有錯誤消息都會發布:spring.cloud.stream.bindings.error.destination

+0

感謝您的輸入。我真的希望我的目標能夠通過Spring Cloud Stream實現。 – user3665549

+0

您是否找到使用Spring雲流的解決方案?關於錯誤處理的文檔非常少,令人驚訝的是,我在我的配置文件中設置了目標錯誤通道,但沒有發佈錯誤信息。 –

+0

嗨,這是在這裏解決:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/125 –