2017-09-22 60 views
0

我有一個Kafka消費者實例創建內部線程作爲構造函數的一部分,並在線程內運行方法我叫不同的Web服務,並保持呼叫非阻塞我正在使用完整的未來。我的問題是,我無法通過調用thenApply方法和傳遞Kafka使用者實例來發出提交,因爲它給了我一個Kafka使用者不是線程安全的錯誤。雖然在我的commit方法我已經做了代碼卡夫卡消費者補償提交內可完成的未來

synchronized(consumer) { 
    commitResponse(); 
} 

還是我得到ConcurrentModificationException

class KafkaConsumerThread implements Runnable { 

    KafkaConsumer<String, String> consumer; 

    public KafkaConsumerThread(Properties properties) { 
    consumer = new KafkaConsumer<String, String>(properties);  
    ... 
    } 

    @Override 
    public void run() { 
    try { 
     // synchronized (consumer) { 
     consumer.subscribe(topics); 
     while (true) { 
     if (closed.get()) { 
      consumer.close(); 
     } 
     ConsumerRecords<String, String> records = consumer.poll(120000); 
     for (ConsumerRecord<String, String> record : records) { 
      getAsyncClient().prepareGet(webServiceUrl) 
       .execute() 
       .toCompletableFuture() 
       .thenApply(resp -> callAnotherService1(resp)) 
       .thenApply(resp -> callAnotherService2(resp)) 
       .thenApply(resp -> commitResponse(resp, consumer)); 
      } 
     } 
     } 
    } catch (Exception ex) { 
     ... 
    } 

在上面的代碼中,我得到了commitResponse方法「KafkaConsumer不是多線程訪問安全的」內部異常。雖然在我的提交響應中,如果我將synchronized提交到synchronized(consumer)中,我仍然會收到錯誤。

回答

0

很可能是因爲poll方法不同步,並且在異步GET執行提交時執行(仍在運行時持有內部kafka鎖定)。

看到私有方法的引用:在org.apache.kafka.clients.consumer.KafkaConsumer

org.apache.kafka.clients.consumer.KafkaConsumer.acquire()org.apache.kafka.clients.consumer.KafkaConsumer.release()