2017-07-19 63 views
3

新的Kafka版本(0.11)只支持一次語義。Kafka中sendOffsetsToTransaction的含義0.11

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我有一個製片人設置在這樣的java卡夫卡的交易代碼。

producer.initTransactions(); 
    try { 
     producer.beginTransaction(); 
     for (ProducerRecord<String, String> record : payload) { 
      producer.send(record); 
     } 

     Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() { 
      { 
       put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null)); 
      } 
     }; 
     producer.sendOffsetsToTransaction(groupCommit, "groupId"); 
     producer.commitTransaction(); 
    } catch (ProducerFencedException e) { 
     producer.close(); 
    } catch (KafkaException e) { 
     producer.abortTransaction(); 
    } 

我不太清楚如何使用sendOffsetsToTransaction和它的預期用例。 AFAIK,消費者羣體是消費者端的多線程閱讀功能。

的javadoc說

「發送消耗偏移到消費羣協調列表,也標誌着這些偏移作爲當前事務的一部分。這些偏移僅如果交易成功提交纔算消耗。這方法應該用於需要將消耗和生成的消息一起批量處理,通常以消費變換產生模式進行。「

如何生成保持消耗的補償列表?它有什麼重要的意義?

回答

0

這隻與您正在使用的工作流相關,然後根據這些使用情況生成消息。在這種情況下,只有在下游生產者事務成功時,此功能才允許您提交偏移量。

如果沒有交易,您使用Consumer#commitSync()Consumer#commitAsync()。但是,如果在使用與生產者一起使用的數據之前使用這些使用方法提交偏移量,則在知道生產者是否成功發送其記錄之前,您將承諾提供偏移量。因此,您可以在下游生產商處使用Producer#sendOffsetsToTransaction(),而不是向消費者提交補償。這會將偏移量發送給處理事務的事務管理器。只有在整個事務成功的情況下它纔會提交補償。

注意:當您發送偏移提交,你應該加1偏移最後一次讀取,使未來的讀取偏移你沒有讀過這是真實的簡歷不管你是否要提交。消費者或生產者,見:KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset)。