我有一個使用生產者類以產生消息的多線程應用程序,前面我使用以下代碼來創建生產者爲每個request.where KafkaProducer是新與如下每個請求建:如何重新連接一旦關閉卡夫卡生產者?
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
然後我讀卡夫卡關於生產者的文檔,並且知道我們應該使用單個生產者實例來獲得良好的性能。
然後,我在單例類中創建了一個KafkaProducer實例。
現在當&我們應該關閉生產者。顯然,如果我們關閉第一個發送請求後,製片人就不會發現生產者要重新發送消息,因此拋出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
,或者我們如何可以重新連接到生產者關閉一次。 問題是如果程序崩潰或有異常呢?
它是同步/異步。此外,如果關閉一些如何:例外/應用程序崩潰,然後如何重新連接。請注意,我不會重新初始化KafkaProducer isntance,因爲它不爲null,並且即使在調用close()方法後也會保留所有屬性。此外,我有多個應用程序,即4個使用此共享生產者發送消息到多個主題的消費者。 – usman
@usman你爲什麼說它既是同步又是異步? [你在哪裏看到同步版本](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –
https://kafka.apache.org/08/documentation#implementation 「包裝2個低級別生產者的生產者API - 」。那麼卡夫卡提供了什麼方法呢?你的代碼顯示我們必須重新實例化對象。 – usman