我正在使用spring-cloud-stream:1.3.0.RELEASE,spring-cloud-stream-binder-kafka:1.3.0.RELEASE進行spring引導應用程序的工作。我使用spring integration dsl來分割文件和beanio中的行,將行轉換爲json,要求將成功的json消息寫入kafka主題,並將錯誤消息寫入不同的kafka主題。以下是application.yml中的配置。使用spring雲流向錯誤通道發送錯誤消息
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
bindings.webmarketbasket:
destination: webmarketbasket
group: usproductrecommendationsgroup
producer:
partitionCount: 5
errorChannelEnabled: true
bindings.webmarketbasket.errors:
destination: webmarketbasketerrors
group: usproductrecommendationsgroup
producer:
partitionCount: 5
bindings.error:
destination: errorchannel
group: usproductrecommendationsgroup
producer:
partitionCount: 5
我在春天的雲流 - 粘合劑 - 卡夫卡注意到pull請求https://github.com/spring-cloud/spring-cloud-stream/pull/1039:1.3.0.RELEASE,當errorChannelEnabled設置爲true它創建PublishSubscribeChannel,還存在哪些檢查是否豆是一個測試用例爲製作者錯誤頻道創建。
當我在我的應用程序http://localhost:8195/beans中檢查彈簧執行器url時,會創建「errorChannel」全局錯誤通道bean,但不會創建「webmarketbasket.errors」bean。當出現「org.springframework.messaging.MessageHandlingException」時,錯誤消息被髮送到「errorchannel」kafka主題並停止處理文件中剩餘的行。卡夫卡話題「webmarketbasketerrors」從未被創造。你能幫忙,請讓我知道我是否遺漏了任何東西。
謝謝加里。我試着用「webmarketbasket.usproductrecommendationsgroup.errors」。但是仍然沒有創建專用錯誤通道的bean名稱。它看起來像組僅添加到了消費者而不是AbstractMessageChannelBinder.java中的生產者。 – mariappan
我通過'@Output(「webmarketbasket.errors」)爲專用錯誤通道添加了一個子通道 \t SubscribableChannel webMarketBasketErrorChannel();',bean名稱「webmarketbasket.errors」被創建並創建了kafka主題。但錯誤消息不會發送到「webmarketbasket.errors」主題,錯誤消息只發送到全局錯誤通道。 – mariappan
這不會幫助;框架中沒有任何內容知道該頻道;請重新閱讀我的答案。 –