2017-07-25 95 views
3

重新啓動Kafka Connect S3接收器任務後,重新從主題開始處重新開始寫入,並寫入較舊記錄的重複副本。換句話說,Kafka Connect似乎失去了它的地位。重新啓動Kafka Connect S3 Sink任務丟失位置,完全重寫所有內容

所以,我想像Kafka Connect在內部connect-offsets主題中存儲當前偏移位置信息。這個話題是空的,我認爲這是問題的一部分。

另外兩個內部主題connect-statusesconnect-configs不爲空。 connect-statuses有52個條目。 connect-configs有6個條目;三個用於我配置的兩個接收器連接器中的每一個:connector-<name>task-<name>-0commit-<name>

我手動創建的內部卡夫卡連接主題運行此之前,如文檔規定:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact 
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact 
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact 

我可以確認的是,connect-offsets話題似乎是正確創建:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets 
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact 
    Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 
    Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 
    Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 
    <snip> 

這是運行Confluent Platform v3.2.1的三臺服務器羣集運行Kafka 10.2.1。

connect-offsets應該是空的嗎?爲什麼Kafka Connect在重新開始任務時會在主題開始時重新啓動?

UPDATE:迴應Randall Hauch的回答。

  • 關於源連接器偏移量與宿連接器偏移量的說明解釋爲空connect-offsets。感謝您的解釋!
  • 我絕對不會改變連接器名稱。
  • 如果連接器已關閉約五天並在之後重新啓動,是否有任何理由使連接器偏移位置過期並重置?我看到__consumer_offsetscleanup.policy=compact
  • auto.offset.reset只應該在__consumer_offsets沒有位置的時候有效果吧?

我使用的主要是系統默認值。我的接收器配置JSON如下。我正在使用一個非常簡單的自定義分區程序在Avro日期時間字段而不是掛鐘時間進行分區。該功能似乎已添加到Confluent v3.2.2中,因此我不需要該功能的自定義插件。我希望跳過Confluent v3.2.2並在可用時直接訪問v3.3.0。

{ 
    "name": "my-s3-sink", 

    "tasks.max": 1, 
    "topics": "my-topic", 
    "flush.size": 10000, 

    "connector.class": "io.confluent.connect.s3.S3SinkConnector", 
    "storage.class": "io.confluent.connect.s3.storage.S3Storage", 
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", 
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator", 
    "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner", 

    "s3.bucket.name": "my-bucket", 
    "s3.region": "us-west-2", 

    "partition.field.name": "timestamp", 

    "locale": "us", 
    "timezone": "UTC", 
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", 

    "schema.compatibility": "NONE", 

    "key.converter": "io.confluent.connect.avro.AvroConverter", 
    "key.converter.schema.registry.url": "http://localhost:8081", 
    "value.converter": "io.confluent.connect.avro.AvroConverter", 
    "value.converter.schema.registry.url": "http://localhost:8081" 
} 

回答

4

默認偏移保留期卡夫卡消費者爲24小時(1440分鐘)。如果停止連接器,因此作出任何新的提交了超過24小時的偏移將到期,當你重新啓動,你會開始了作爲一個新的消費者。您可以使用offsets.retention.minutes參數

+0

非常感謝! – clay

4

卡夫卡Connect使用的connect-offsets主題(或任何你的名字)來存儲偏移量源連接器,但下沉連接器偏移所使用的正常卡夫卡消費者組存儲機制。

連接器可能重新啓動的一個原因是連接器名稱更改。連接器名稱用於定義使用者組的名稱,因此如果更改連接器的名稱,則重新啓動時,連接器將使用不同的使用者組,使用者將從頭開始。

另一個原因可能是,卡夫卡連接消費者被配置爲從一開始每次啓動,通過consumer.auto.offset.reset=earliest

的S3連接器版本3.3.0(即將推出),有幾個問題的修補程序,以及其中的一些影響在時間或模式的工作方式旋轉。您還沒有提供您的配置,所以很難說是否這些會導致你看到的行爲。

+0

驚人的答案修改的__consumer_offsets話題的保留期限。謝謝!我發佈了更多的細節,並在主要問題中跟進問題。 – clay

相關問題