2016-04-28 72 views
0

我想用NodeJS和Kafka構建一個API,它可以將偏移量和主題作爲輸入並輸出從偏移量開始的前10個消息。我用No-KafkaKafka-Node嘗試了這種方法。使用Apache Kafka和NodeJS讀取特定消息

它們提供的使用者API允許使用特定偏移量的消息。一旦我閱讀了10條消息,我想停止使用這些消息。但是這兩個API調用都將繼續獲取消息直到最後一條消息。我怎麼能停止這樣做?

這裏是我的EDITED全碼

var Kafka = require('no-kafka'); 
var express = require("express"); 
var app = express(); 

var producer = new Kafka.Producer(); 
producer.init().then(function() { 
    console.log("Producer Ready"); 
}); 

var consumer = new Kafka.SimpleConsumer(); 
consumer.init().then(function() { 
    console.log("Consumer Ready"); 
}); 

app.get('/produce/:topic/:msg', function(req, res) { 
    producer.send({ 
    topic: req.params.topic, 
    partition: 0, 
    message: { 
     value: req.params.msg 
    } 
    }); 
    res.send("Added: " + req.params.msg + " to topic: " + req.params.topic); 
}); 

app.get('/consume/:topic/:off', function(req, res) { 
    console.log("Request for topic: " + req.params.topic + " Offset: " + req.params.off); 
    consumer.subscribe(req.params.topic, 0, { 
    offset: req.params.off, 
    maxBytes: 1000 
    }, function(messageSet, topic, partition) { 
    var msg = ""; 
    var size = messageSet.length; 
    //console.log(messageSet); 
    messageSet.some(function(m) { 
     msg += m.message.value.toString('utf8') + " "; 
     if (parseInt(m.offset, 10) > parseInt(req.params.off, 10) + 10) { 
     return true; 
     } 
    }); 
    res.send("Thank you " + size + " " + req.params.off + " " + msg); 
    }); 
}); 

app.listen(process.env.PORT); 

在這方面的任何迴應表示讚賞。

回答

0

出於幾個不同的原因,你不能真正停止從卡夫卡那麼突然地消費。首先,卡夫卡消費者,無論是JavaScript還是其他東西,都不會一次讀取消息 - 他們會獲取批量的消息。我知道kafka-node它看起來像是一次一個進來,因爲每個消息都會有一個EventEmitter事件。但在引擎蓋下,客戶端批量提取它們。

您可以做的最好的方式就是隨時跟蹤您的偏移量,並且當您超出範圍時只需忽略它們,然後取消訂閱該主題或關閉消費者以停止收聽。

這對分區肯定會變得棘手 - 您必須跟蹤相對於所有分區的偏移量。我不會做同樣的事情 - 我的典型用例是從某個時間點讀取每個分區的當前偏移量。所以我沒有優化我的分區讀取,只要他們達到最後的偏移量即可消失。我做addTopics並一次添加所有的部分。另一方面,您可能需要逐個添加分區 - 即對特定分區執行addTopic,讀取該分區直到找到您的偏移量,然後忽略分區上的消息和removeTopic

我相信我已經玩過這個流程,你甚至可能不得不爲每個分區站起來一個新的消費者,更不用說一個全新的客戶。

+0

我現在只有一個分區。忽略超出範圍的偏移量會使我的其他同時請求變慢。我不知道如何取消訂閱消費者。還有一點,如果兩個或兩個以上的消費者爲消費者API提供不同的偏移量,並且EventEmitter一個接一個地提供消息,那麼如何確定哪個消費者要求哪個消息? – LearningToCode

+0

使用多個消費者 - 這意味着不止一個客戶。這是我能夠實現它的唯一方式。我使用的模式是這樣的:我對待流中的歷史記錄與將來的記錄不同。如果我想閱讀歷史數據,我站出來一個新的客戶+消費者,並研究我需要研究的內容。爲了建立一個監聽器來讀取未來的數據,我複用了監聽器 - 如果兩個消費者想要爲將來的請求傾聽同一主題,我讓他們使用同一個用戶。 –

+0

你能推薦一些可以提供我的功能的代碼嗎?我在問題中顯示的代碼正常工作,但每個響應需要大約1秒。 – LearningToCode

相關問題