2017-07-02 81 views

回答

1

當Kafka Connect worker正在運行sink任務時,它將使用分配給該任務的主題分區中的消息。當它這樣做時,它通過put(Collection<SinkRecord>)方法重複地將一批消息傳遞給接收器任務。只要連接器及其任務正在運行,這將繼續。

Kafka Connect還會定期記錄接收器任務的進度,即每個主題分區上最近處理的消息的偏移量。這稱爲提交了偏移量,並且它這樣做,以便如果連接器意外停止並且不乾淨,Kafka Connect會知道任務應該在哪個主題分區中恢復處理消息。但在Kafka Connect向Kafka寫入偏移量之前,Kafka Connect工作人員通過flush(...)方法爲接收器連接器提供了在此階段工作的機會。

特定的接收器連接器可能不需要做任何事情(如果put(...)完成了所有的工作),或者它可能利用這個機會將已經通過put(...)處理的所有消息提交給數據存儲器。例如,Confluent's JDBC sink connector使用事務(其大小可以通過連接器的用戶設置進行控制)寫入通過put(...)方法傳遞的每批消息,因此flush(...)方法不需要執行任何操作。另一方面,Confluent's ElasticSearch sink connector僅收集一系列put(...)方法的所有消息,並僅在flush(...)期間將它們寫入Elasticsearch。

偏移量爲源和宿連接器提交的頻率由連接器的offset.flush.interval.ms配置屬性控制。缺省情況是每60秒提交一次偏移量,這足以提高性能並降低開銷,但頻繁程度足以限制連接器任務意外死亡時可能的重新處理量。請注意,當連接器正常關機或遇到異常時,Kafka Connect將始終有機會提交偏移量。只有當Kafka Connect工作人員意外死亡,它可能沒有機會提交補償來確定哪些消息已被處理。因此,只有在這樣的故障後重新啓動後,連接器纔有可能重新處理它在故障發生之前所做的一些消息。這是因爲消息至少會有一次消息應該是冪等的。在爲此設置確定適當的值時,請將所有此加上連接器的行爲考慮在內。

查看Confluent documentation for Kafka Connect以及更多示例和詳細信息的開源接收器連接器。

相關問題