0

我正試圖在Kafka Processor中實現一個事務,以確保不會再次處理相同的消息兩次。給定一條消息(A)我需要創建一個將在事務中的另一個主題上產生的消息列表,並且我想在同一事務中提交原始消息(A)。從文檔中我發現Producer方法sendOffsetsToTransaction似乎只有在成功時才能在事務中提交偏移量。這是process()方法我Processor內部的代碼:KafkaProducer sendOffsetsToTransaction需要偏移+ 1才能成功提交當前偏移量

producer.beginTransaction() 
    val topicPartition = new TopicPartition(this.context().topic(), this.context().partition()) 
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset()) 
    val map    = Map(topicPartition -> offsetAndMetadata).asJava 
    producer.sendOffsetsToTransaction(map, "consumer-group-id") 
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value))) 
    producer.commitTransaction() 
    throw new RuntimeException("expected exception") 
與此代碼

不幸的是(在每次執行時即明顯失敗)處理過的消息(A)每次我重新處理重新啓動異常後的應用。

我設法讓作品加入+1this.context().offset()偏移返回,並以這種方式重新定義val offsetAndMetadata

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1) 

這是正常的行爲,或者說我做錯了什麼?

謝謝:)

回答

1

您的代碼是正確的。

您提交的偏移量是您想要接下來要讀取的消息的偏移量(而不是您上次讀取的消息的偏移量)。

比較:https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346

+1

謝謝馬蒂亞斯:)看來你是對的!但是,如果這是'sendOffsetsToTransaction'的預期行爲,我認爲文檔有點誤導。 –

+0

我剛剛檢查了JavaDoc,並且還與'KafkaConsumer#commit'文檔進行了比較 - 我同意並且會提出一個PR來修復它以便下次發佈。謝謝! –

+0

@ MatthiasJ.Sax:你的意思是「你想*寫下*的信息」? –