2017-09-12 148 views
0

我們有一個生產者&一個消費者&一個分區。消費者/生產者都是彈簧啓動應用程序。消費者應用在我的本地機器上運行,而製作者與遠程機器上的kafka &動物園管理員一起運行。kafka生產者/消費者重新啓動後,消費者沒有收到消息

在開發過程中,我重新部署了生產者應用程序並進行了一些更改。但在此之後,我的消費者沒有收到任何消息。我嘗試重新啓動消費者,但沒有運氣。什麼是問題和/或如何解決?

消費者配置:

spring: 
    cloud: 
    stream: 
     defaultBinder: kafka 
     bindings: 
     input: 
      destination: sales 
      content-type: application/json 
     kafka: 
     binder: 
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
      defaultZkPort: 2181 
      defaultBrokerPort: 9092 
server: 
    port: 0 

製片配置

cloud: 
stream: 
    defaultBinder: kafka 
    bindings: 
    output: 
     destination: sales 
     content-type: application/json 
    kafka: 
    binder: 
     brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1} 
     defaultZkPort: 2181 
     defaultBrokerPort: 9092 

EDIT2

5分鐘後,消費者應用具有以下異常死亡:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder 
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder: 
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input 
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel 
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel 
+0

聽起來很簡單。你介意在GitHub的某個地方分享這個應用程序,這樣我們就可以在本地重現問題了嗎? –

+0

@ArtemBilan我很抱歉,但我無法分享我的代碼。你需要什麼細節來提出解決方案? – LazyTechie

+0

我沒有代碼沒有想法。也許你可以分享給消費者和製作人的配置?是的,我知道你不能共享整個應用程序,但至少可以爲我們想出一些簡單的Spring Boot應用程序... –

回答

0

那麼,它看起來好像已經有一個bugspring-cloud-stream-binder-kafka報告說明resetOffset屬性沒有影響。因此,消費者總是要求消息的偏移量爲latest

正如在git問題中提到的,唯一的解決方法是通過kafka使用者CLI工具修復此問題。

1

看看上面關於DEBUG的建議是否顯示了更多的信息。它看起來像是從KafkaTopicProvisioner獲得了一些Timeout異常。但是,當您重新啓動消費者時,會出現這種情況。它看起來像消費者以某種方式與經紀人溝通有一些麻煩,你需要找出那裏正在發生的事情。