我第一次使用kafka的節點,並使用kafka-node。消費一條消息需要調用一個外部API,甚至可能需要一秒鐘才能響應。我希望克服我的消費者的突然失敗,方式是如果消費者失敗,另一個消費者將替換它將收到相同的消息,表明其工作未完成。如何使用kafka-node控制所使用的kafka消息的提交
我正在使用kafka 0.10並嘗試使用ConsumerGroup。
我想到了在選項中設置autoCommit: false
,並且只在其工作完成後才提交消息(就像我以前在某些Java代碼中完成的那樣)。
但是,我似乎無法確定一旦完成該如何才能正確提交該消息。我應該如何提交?
我還擔心的另一個問題是,由於回調,下一條消息在前一條消息完成之前正在讀取。我擔心,如果消息x + 2在消息x + 1之前完成,則偏移量將被設置爲x + 2,因此如果發生故障,x + 1將不會被重新執行。
這裏基本上是我做過迄今爲止:
var options = {
host: connectionString,
groupId: consumerGroupName,
id: clientId,
autoCommit: false
};
var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('connect', function() {
console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});
consumerGroup.on('message', function(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
console.log(message.value);
doSomeStuff(function() {
// HOW TO COMMIT????
consumerGroup.commit(function(err, data) {
console.log("------ Message done and committed ------");
});
});
});
consumerGroup.on('error', function(err) {
console.log("Error in consumer: " + err);
close();
});
process.once('SIGINT', function() {
close();
});
var close = function() {
// SHOULD SEND 'TRUE' TO CLOSE ???
consumerGroup.close(true, function(error) {
if (error) {
console.log("Consuming closed with error", error);
} else {
console.log("Consuming closed");
}
});
};
指向您的個人資料的鏈接指向 –
完成。感謝您指出 –