2013-08-12 37 views
4

我正在運行0.8 Kafka,並使用提供的Java API構建生產者。
發送消息(或消息)的API函數返回void。Kafka - producer - handle「failed to send」

有沒有辦法獲得發送消息的狀態?如果它發送或失敗?

這對我們來說非常重要,因爲我們正在閱讀來自文件的消息,並且我們希望在發送所有消息後刪除文件。但是如果出現錯誤並且一些消息沒有發送,我刪除該文件會導致丟失非常重要的數據。

+0

只是猜測,但如果發送沒有成功,可能會引發異常?所以如果你發現這個例外,你不會刪除你的文件。 – asmaier

+0

「發送」函數是aSync(在不同線程稍後發生實際發送時立即返回),因此不會拋出異常。 –

回答

2

您可以配置您的生產者等待,直到它從Kafka集羣獲取數據(request.required.acks),以便在刪除源文件之前確保數據已正確提交。

如果確實您需要確保消息發送成功,您可能需要考慮使生產者同步的選擇(producer.type = sync)。這樣,您將能夠捕獲阻塞調用引發的任何異常並相應地執行操作。 send()拋出的異常是kafka.common.FailedToSendMessageException。

卡夫卡的Java API並不理想,希望這對你有所幫助。

+0

Yeap,我把它留在那裏,最近回到它,你是對的。 'type = sync'是確保消息被髮送的唯一方式,但是這決定了地獄般的表現。 我們正在尋找不同的隊列。 –

相關問題