2016-10-07 135 views
1

我有一個使用生產者類以產生消息的多線程應用程序,前面我使用以下代碼來創建生產者爲每個request.where KafkaProducer是新與如下每個請求建:如何重新連接一旦關閉卡夫卡生產者?

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop); 

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes); 
producer.send(data, new Callback() { 

       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null) { 
         isValidMsg[0] = false; 
         exception.printStackTrace(); 
         saveOrUpdateLog(msgBean, producerType, exception); 
         logger.error("ERROR:Unable to produce message.",exception); 
        } 
       } 
      }); 
producer.close(); 

然後我讀卡夫卡關於生產者的文檔,並且知道我們應該使用單個生產者實例來獲得良好的性能。

然後,我在單例類中創建了一個KafkaProducer實例。

現在當&我們應該關閉生產者。顯然,如果我們關閉第一個發送請求後,製片人就不會發現生產者要重新發送消息,因此拋出:

java.lang.IllegalStateException: Cannot send after the producer is closed. 

,或者我們如何可以重新連接到生產者關閉一次。 問題是如果程序崩潰或有異常呢?

回答

2

KafkaProducer.send方法是異步的,它返回一個Future[RecordMetadata]。如果您在發送後立即致電close,則說明您有競爭狀況,並且由於KafkaProducer的緩衝性質,您的消息可能永遠無法發送。

如果您的生產者在您的應用程序的整個生命週期中都在使用,請不要關閉它,並在應用程序終止後讓它死掉。如文檔中所述,生產者可以安全地在多線程環境中使用,因此您應該重新使用同一個實例。

如果你仍然認爲你需要關閉KafkaProducer在某些情況下,你可以添加一個isClosed標誌,你的卡夫卡對象內部和監控,如果消費者需要重新發送數據。草圖可以是:

object KafkaOwner { 
    private var producer: KafkaProducer = ??? 
    @volatile private var isClosed = false 

    def close(): Unit = { 
    if (!isClosed) { 
     kafkaProducer.close() 
     isClosed = true 
    } 
    } 

    def instance: KafkaProducer = { 
    this.synchronized { 
     if (!isClosed) producer 
     else { 
     producer = new KafkaProducer() 
     isClosed = false 
     } 
    } 
    } 
} 
+0

它是同步/異步。此外,如果關閉一些如何:例外/應用程序崩潰,然後如何重新連接。請注意,我不會重新初始化KafkaProducer isntance,因爲它不爲null,並且即使在調用close()方法後也會保留所有屬性。此外,我有多個應用程序,即4個使用此共享生產者發送消息到多個主題的消費者。 – usman

+0

@usman你爲什麼說它既是同步又是異步? [你在哪裏看到同步版本](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –

+0

https://kafka.apache.org/08/documentation#implementation 「包裝2個低級別生產者的生產者API - 」。那麼卡夫卡提供了什麼方法呢?你的代碼顯示我們必須重新實例化對象。 – usman

1

如的javadoc KafkaProducer描述:

public void close() 

Close this producer. This method blocks until all previously sent requests complete. 
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). 

源:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

所以你不必擔心你的郵件不會被髮送即使您在發送後立即致電關閉。

如果您打算多次使用KafkaProducer,請在完成使用後關閉它。如果您仍然希望保證您的消息在您的方法完成之前實際發送,而不是在緩衝區中等待,請使用KafkaProducer#flush(),這將阻塞,直到發送當前緩衝區。如果您願意,也可以在Future#get()上屏蔽。

還有一個警告要注意的,如果你不打算永遠閉上你的KafkaProducer(例如,在短暫的應用程序,您只需發送一些數據和應用程序將立即終止發送後) 。 KafkaProducer IO線程是守護進程線程,這意味着JVM不會等到該線程完成終止虛擬機。因此,要確保您的消息實際上使用KafkaProducer#flush(),no-arg KafkaProducer#close()Future#get()上的阻止。

相關問題