2016-10-18 105 views
1

我第一次使用kafka的節點,並使用kafka-node。消費一條消息需要調用一個外部API,甚至可能需要一秒鐘才能響應。我希望克服我的消費者的突然失敗,方式是如果消費者失敗,另一個消費者將替換它將收到相同的消息,表明其工作未完成。如何使用kafka-node控制所使用的kafka消息的提交

我正在使用kafka 0.10並嘗試使用ConsumerGroup。

我想到了在選項中設置autoCommit: false,並且只在其工作完成後才提交消息(就像我以前在某些Java代碼中完成的那樣)。

但是,我似乎無法確定一旦完成該如何才能正確提交該消息。我應該如何提交?

我還擔心的另一個問題是,由於回調,下一條消息在前一條消息完成之前正在讀取。我擔心,如果消息x + 2在消息x + 1之前完成,則偏移量將被設置爲x + 2,因此如果發生故障,x + 1將不會被重新執行。

這裏基本上是我做過迄今爲止:

var options = { 
    host: connectionString, 
    groupId: consumerGroupName, 
    id: clientId, 
    autoCommit: false 
}; 

var kafka = require("kafka-node"); 
var ConsumerGroup = kafka.ConsumerGroup; 

var consumerGroup = new ConsumerGroup(options, topic); 

consumerGroup.on('connect', function() { 
    console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic); 
}); 

consumerGroup.on('message', function(message) { 
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset); 
    console.log(message.value); 
    doSomeStuff(function() { 
     // HOW TO COMMIT???? 
     consumerGroup.commit(function(err, data) { 
      console.log("------ Message done and committed ------"); 
     }); 
    }); 
}); 

consumerGroup.on('error', function(err) { 
    console.log("Error in consumer: " + err); 
    close(); 
}); 

process.once('SIGINT', function() { 
    close(); 
}); 

var close = function() { 
    // SHOULD SEND 'TRUE' TO CLOSE ??? 
    consumerGroup.close(true, function(error) { 
     if (error) { 
      console.log("Consuming closed with error", error); 
     } else { 
      console.log("Consuming closed"); 
     } 
    }); 
}; 

回答

0

一件事你可以在這裏做的是對每次處理消息的重試機制。

您可以諮詢在此線程我的回答: https://stackoverflow.com/a/44328233/2439404

我使用kafka-consumer,批量在一起使用消息從卡夫卡使用async/cargo,並把它們在async/queue(內存隊列)。該隊列將一個工作函數作爲一個參數傳遞給async/retryable

對於你的問題,你可以使用retryable來處理你的消息。 https://caolan.github.io/async/docs.html#retryable

這可能會解決您的問題。

+0

指向您的個人資料的鏈接指向 –

+0

完成。感謝您指出 –

相關問題