重新啓動Kafka Connect S3接收器任務後,重新從主題開始處重新開始寫入,並寫入較舊記錄的重複副本。換句話說,Kafka Connect似乎失去了它的地位。重新啓動Kafka Connect S3 Sink任務丟失位置,完全重寫所有內容
所以,我想像Kafka Connect在內部connect-offsets
主題中存儲當前偏移位置信息。這個話題是空的,我認爲這是問題的一部分。
另外兩個內部主題connect-statuses
和connect-configs
不爲空。 connect-statuses
有52個條目。 connect-configs
有6個條目;三個用於我配置的兩個接收器連接器中的每一個:connector-<name>
,task-<name>-0
,commit-<name>
。
我手動創建的內部卡夫卡連接主題運行此之前,如文檔規定:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
我可以確認的是,connect-offsets
話題似乎是正確創建:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
這是運行Confluent Platform v3.2.1的三臺服務器羣集運行Kafka 10.2.1。
是connect-offsets
應該是空的嗎?爲什麼Kafka Connect在重新開始任務時會在主題開始時重新啓動?
UPDATE:迴應Randall Hauch的回答。
- 關於源連接器偏移量與宿連接器偏移量的說明解釋爲空
connect-offsets
。感謝您的解釋! - 我絕對不會改變連接器名稱。
- 如果連接器已關閉約五天並在之後重新啓動,是否有任何理由使連接器偏移位置過期並重置?我看到
__consumer_offsets
有cleanup.policy=compact
auto.offset.reset
只應該在__consumer_offsets
沒有位置的時候有效果吧?
我使用的主要是系統默認值。我的接收器配置JSON如下。我正在使用一個非常簡單的自定義分區程序在Avro日期時間字段而不是掛鐘時間進行分區。該功能似乎已添加到Confluent v3.2.2中,因此我不需要該功能的自定義插件。我希望跳過Confluent v3.2.2並在可用時直接訪問v3.3.0。
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
非常感謝! – clay