0
我有一個Kafka消費者實例創建內部線程作爲構造函數的一部分,並在線程內運行方法我叫不同的Web服務,並保持呼叫非阻塞我正在使用完整的未來。我的問題是,我無法通過調用thenApply方法和傳遞Kafka使用者實例來發出提交,因爲它給了我一個Kafka使用者不是線程安全的錯誤。雖然在我的commit方法我已經做了代碼卡夫卡消費者補償提交內可完成的未來
synchronized(consumer) {
commitResponse();
}
還是我得到ConcurrentModificationException
。
class KafkaConsumerThread implements Runnable {
KafkaConsumer<String, String> consumer;
public KafkaConsumerThread(Properties properties) {
consumer = new KafkaConsumer<String, String>(properties);
...
}
@Override
public void run() {
try {
// synchronized (consumer) {
consumer.subscribe(topics);
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(120000);
for (ConsumerRecord<String, String> record : records) {
getAsyncClient().prepareGet(webServiceUrl)
.execute()
.toCompletableFuture()
.thenApply(resp -> callAnotherService1(resp))
.thenApply(resp -> callAnotherService2(resp))
.thenApply(resp -> commitResponse(resp, consumer));
}
}
}
} catch (Exception ex) {
...
}
在上面的代碼中,我得到了commitResponse方法「KafkaConsumer不是多線程訪問安全的」內部異常。雖然在我的提交響應中,如果我將synchronized提交到synchronized(consumer)中,我仍然會收到錯誤。