新的Kafka版本(0.11)只支持一次語義。Kafka中sendOffsetsToTransaction的含義0.11
我有一個製片人設置在這樣的java卡夫卡的交易代碼。
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
我不太清楚如何使用sendOffsetsToTransaction和它的預期用例。 AFAIK,消費者羣體是消費者端的多線程閱讀功能。
的javadoc說
「發送消耗偏移到消費羣協調列表,也標誌着這些偏移作爲當前事務的一部分。這些偏移僅如果交易成功提交纔算消耗。這方法應該用於需要將消耗和生成的消息一起批量處理,通常以消費變換產生模式進行。「
如何生成保持消耗的補償列表?它有什麼重要的意義?