我已經使用了春季io文檔中列出的示例配置,它工作正常。春季集成 - 卡夫卡消息驅動通道 - 自動確認
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
但是,當我測試它與下游應用程序,我從卡夫卡消耗併發布到下游。如果下游關閉,則消息仍然消耗並且不會重播。
或者讓我們說從kafka話題中消費後,如果我在服務激活器中發現一些異常,我想拋出一些異常以及應該回滾事務,以便可以重放kafka消息。
簡而言之,如果消費應用程序有問題,那麼我想要回滾事務,以便消息不會自動確認,並且一次又一次地重播回來,除非它被成功處理。