我試圖使用Kafka接收器連接將批處理中的數據發送到NOSQL數據庫。我正在關注https://kafka.apache.org/documentation/#connect文檔,並且對發送記錄的邏輯必須在何處實施的問題感到困惑。請幫助我理解記錄是如何在內部處理的,以及必須使用Put()或Flush()來批處理記錄。Put()vs Flush()在Kafka連接器接收器任務
回答
當Kafka Connect worker正在運行sink任務時,它將使用分配給該任務的主題分區中的消息。當它這樣做時,它通過put(Collection<SinkRecord>)
方法重複地將一批消息傳遞給接收器任務。只要連接器及其任務正在運行,這將繼續。
Kafka Connect還會定期記錄接收器任務的進度,即每個主題分區上最近處理的消息的偏移量。這稱爲提交了偏移量,並且它這樣做,以便如果連接器意外停止並且不乾淨,Kafka Connect會知道任務應該在哪個主題分區中恢復處理消息。但在Kafka Connect向Kafka寫入偏移量之前,Kafka Connect工作人員通過flush(...)
方法爲接收器連接器提供了在此階段工作的機會。
特定的接收器連接器可能不需要做任何事情(如果put(...)
完成了所有的工作),或者它可能利用這個機會將已經通過put(...)
處理的所有消息提交給數據存儲器。例如,Confluent's JDBC sink connector使用事務(其大小可以通過連接器的用戶設置進行控制)寫入通過put(...)
方法傳遞的每批消息,因此flush(...)
方法不需要執行任何操作。另一方面,Confluent's ElasticSearch sink connector僅收集一系列put(...)
方法的所有消息,並僅在flush(...)
期間將它們寫入Elasticsearch。
偏移量爲源和宿連接器提交的頻率由連接器的offset.flush.interval.ms
配置屬性控制。缺省情況是每60秒提交一次偏移量,這足以提高性能並降低開銷,但頻繁程度足以限制連接器任務意外死亡時可能的重新處理量。請注意,當連接器正常關機或遇到異常時,Kafka Connect將始終有機會提交偏移量。只有當Kafka Connect工作人員意外死亡,它可能沒有機會提交補償來確定哪些消息已被處理。因此,只有在這樣的故障後重新啓動後,連接器纔有可能重新處理它在故障發生之前所做的一些消息。這是因爲消息至少會有一次消息應該是冪等的。在爲此設置確定適當的值時,請將所有此加上連接器的行爲考慮在內。
查看Confluent documentation for Kafka Connect以及更多示例和詳細信息的開源接收器連接器。
- 1. Kafka將JDBC接收器連接到Oracle任務失敗
- 2. Kafka連接Cassandra連接器
- 3. Flink Kafka連接器
- 4. Kafka連接S3接收器在加載Avro時拋出IllegalArgumentException
- 5. 無法連接到kafka服務器
- 6. Kafka接收器連接器:即使在重新啓動後也沒有分配任務
- 7. 在不同的服務器上運行Kafka和Kafka連接?
- 8. 我如何關閉Apache Kafka連接器任務?
- 9. Kafka連接訪問兩個接收器(ftp和sql)
- 10. Kafka連接任務重新啓動`NoSuchMethodError:HttpServletRequest.isAsyncStarted`?
- 11. 在springcloud中缺少Kafka源/接收器
- 12. citrix接收器連接條
- 13. Kafka Connect HDFS接收器問題
- 14. Kafka Connect接收器分區:子分區?
- 15. Tomcat 5.5 https連接器vs ldap連接
- 16. TCP連接VS ping服務器
- 17. Kafka節點連接到kafka的本地主機實例,但沒有連接到遠程kafka服務器
- 18. 在rake任務中接收到「ActiveRecord :: StatementInvalid:PG :: ConnectionBad:PQconsumeInput()無法從服務器接收數據:連接超時」
- 19. Kafka Connect - 文件源連接器錯誤
- 20. Oracle Hadoop連接器vs Sqoop
- 21. 如何在SSL連接的服務器端接收數據?
- 22. Neo4j服務器拒絕任何連接
- 23. 服務器連接
- 24. Backbone js連接服務器
- 25. Android Internet連接廣播接收器
- 26. 藍牙連接到非iOS接收器
- 27. 檢索連接的接收器
- 28. Android廣播接收器:在java代碼vs清單中註冊接收器
- 29. Kafka連接到MSSQL服務器十進制轉換
- 30. Kafka服務器屬性 - 無法連接到代理