2017-04-20 111 views
1

我在消費者API(v.0.10.0.0)中使用卡夫卡。卡夫卡在碼頭工人從http://wurstmeister.github.io/kafka-docker/卡夫卡消費者卡住加入集羣

運行使用圖像還我跑這個簡單的測試:

@Test 
    public void test2() { 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", RandomStringUtils.randomAlphabetic(8)); 
    props.put("auto.offset.reset.config", "earliest"); 
    props.put("enable.auto.commit", "false"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
    Properties props1 = new Properties(); 
    props1.put("bootstrap.servers", "localhost:9092"); 

    props1.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    KafkaProducer<String, String> producer1 = new KafkaProducer<>(props1); 
    KafkaProducer<String, String> producer = producer1; 

    consumer.subscribe(asList(TEST_TOPIC)); 

    producer.send(new ProducerRecord<>(TEST_TOPIC, 0, "key", "value message")); 
    producer.flush(); 


    boolean done = false; 
    while (!done) { 
     ConsumerRecords<String, String> msg = consumer.poll(1000); 
     if (msg.count() > 0) { 
     Iterator<ConsumerRecord<String, String>> msgIt = msg.iterator(); 
     while (msgIt.hasNext()) { 
      ConsumerRecord<String, String> rec = msgIt.next(); 
      System.out.println(rec.value()); 
     } 
     consumer.commitSync(); 
     done = true; 
     } 
    } 

    consumer.close(); 
    producer.close(); 
    } 

主題名稱和消費者ID在每次執行是隨機生成的。

的行爲是非常不穩定的。有時候它會工作,有時有下列重複輸出打電話時.poll(),就開始循環:

2017-04-20 12:01:46 DEBUG NetworkClient:476 - Completed connection to node 1003 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 3 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106738, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2bea5ab4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106738, sendTimeMs=1492686106738), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 4 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106840, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]3d8314f0, request=RequestSend(header={api_key=10,api_version=0,correlation_id=5,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106839, sendTimeMs=1492686106839), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 5 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106941, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2df32bf7, request=RequestSend(header={api_key=10,api_version=0,correlation_id=7,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106940, sendTimeMs=1492686106940), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 6 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107042, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.Co[email protected], request=RequestSend(header={api_key=10,api_version=0,correlation_id=9,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107041, sendTimeMs=1492686107041), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 7 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107144, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2a40cd94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=11,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107144, sendTimeMs=1492686107144), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 

有誰知道這是怎麼回事?這對我來說似乎是一個相當簡單的設置/測試...

回答

0

我自己找到了原因。所以我只是在用1個分區的主題上運行消費者。然後,我只是與消費者一起殺死這個進程,所以沒有關閉。

在這種情況下,經紀人將保持消費者的地位,直到會話過期。試圖加入另一位消費者會導致該錯誤直至失效。

爲了解決一個能做到:(?) - 更改組ID - - 等待,直到會話終結 重新啓動代理

如果有人用更多的知識可以更好地解釋,請不要