2015-10-02 17 views
0

我想寫一個函數,在主題中的最後一條消息被讀取後調用回調函數。npm kafka-node:讀完最終消息後關閉客戶

function getCurrentMessages(kafka, topic, cb_done){ 
    // Start consuming from the beginning 
    var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true}); 
    consumer.on('message', function(msg){ 
    // Do something with msg 
    }); 
    consumer.on('final-message-received', function(){ 
    consumer.close(function(){ 
     cb_done(); 
    }); 
    }); 
} 

這是可能與當前庫嗎?我不想讓消費者接受新消息。

+0

是你的目標只是閱讀所有的題目和退出的消息?如果是這樣,你有沒有考慮使用不同的技術? – Superaghu

+0

另外,爲什麼你需要保持消費者,你完成後如何退出? – Superaghu

+0

我不需要保留消費者。我想知道如何在完成後退出。我仍然想要使用卡夫卡,因爲有些消費者會持久,但其他人需要閱讀快照,這正是我想要做的。 – Dave

回答

2

我認爲你可以這樣做,幫助補償。您可以輕鬆地在消費者一側獲取事件偏移的信息。
KAFKA爲您提供以下提到的偏移:

1早些時候偏移
2.電流失調和
3.最新偏移

你只需要獲得最新的或使用Kafka編寫的最後一條消息的偏移量,並在消耗事件時將其與當前偏移量進行比較,在這種情況下,當您使用來自kafka的最後一條消息時,您將獲得最新的偏移量等於當前偏移量。
你可以寫一個IF條件,你將檢查

IF(current_offset == latest_offset)
{
打破你想要的流量或行動;
}

請讓我知道,如果它不適合你或需要任何進一步的信息。

+0

我可以通過在這裏傳入-1來得到最新的偏移量:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 但是,我沒有看到一個調用來獲取當前的偏移量。處理消息時,會看到消息的偏移量,但該主題上的最後一條消息的偏移量低於最新的偏移量。一個黑客應該檢查當前的消息偏移量是否等於比最新的偏移量少一個,但是如果我因任何原因而清除了消息,則可能不是這種情況。 – Dave

0

你可以嘗試這樣的事情:

consumer.on('message', function (message) { 
     console.log("received message", message); 
     if(message.offset == (message.highWaterOffset -1)){ 
      consumer.close(true, function(err, message) { 
      console.log("consumer has been closed.."); 
      }); 
     } 
}); 
+0

你能解釋一下你在這裏做了什麼嗎? –

+0

基本上,消息對象包含當前偏移量和最後偏移量。對於從主題中讀取的每條消息,它會檢查這是否是主題中的最終消息。如果是這樣,關閉消費者並返回。 –

+0

您可以編輯您的代碼。 –