我正試圖在Kafka Processor
中實現一個事務,以確保不會再次處理相同的消息兩次。給定一條消息(A)我需要創建一個將在事務中的另一個主題上產生的消息列表,並且我想在同一事務中提交原始消息(A)。從文檔中我發現Producer
方法sendOffsetsToTransaction
似乎只有在成功時才能在事務中提交偏移量。這是process()
方法我Processor
內部的代碼:KafkaProducer sendOffsetsToTransaction需要偏移+ 1才能成功提交當前偏移量
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
與此代碼
不幸的是(在每次執行時即明顯失敗)處理過的消息(A)每次我重新處理重新啓動異常後的應用。
我設法讓作品加入+1
由this.context().offset()
偏移返回,並以這種方式重新定義val offsetAndMetadata
:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
這是正常的行爲,或者說我做錯了什麼?
謝謝:)
謝謝馬蒂亞斯:)看來你是對的!但是,如果這是'sendOffsetsToTransaction'的預期行爲,我認爲文檔有點誤導。 –
我剛剛檢查了JavaDoc,並且還與'KafkaConsumer#commit'文檔進行了比較 - 我同意並且會提出一個PR來修復它以便下次發佈。謝謝! –
@ MatthiasJ.Sax:你的意思是「你想*寫下*的信息」? –