我想用NodeJS和Kafka構建一個API,它可以將偏移量和主題作爲輸入並輸出從偏移量開始的前10個消息。我用No-Kafka和Kafka-Node嘗試了這種方法。使用Apache Kafka和NodeJS讀取特定消息
它們提供的使用者API允許使用特定偏移量的消息。一旦我閱讀了10條消息,我想停止使用這些消息。但是這兩個API調用都將繼續獲取消息直到最後一條消息。我怎麼能停止這樣做?
這裏是我的EDITED全碼
var Kafka = require('no-kafka');
var express = require("express");
var app = express();
var producer = new Kafka.Producer();
producer.init().then(function() {
console.log("Producer Ready");
});
var consumer = new Kafka.SimpleConsumer();
consumer.init().then(function() {
console.log("Consumer Ready");
});
app.get('/produce/:topic/:msg', function(req, res) {
producer.send({
topic: req.params.topic,
partition: 0,
message: {
value: req.params.msg
}
});
res.send("Added: " + req.params.msg + " to topic: " + req.params.topic);
});
app.get('/consume/:topic/:off', function(req, res) {
console.log("Request for topic: " + req.params.topic + " Offset: " + req.params.off);
consumer.subscribe(req.params.topic, 0, {
offset: req.params.off,
maxBytes: 1000
}, function(messageSet, topic, partition) {
var msg = "";
var size = messageSet.length;
//console.log(messageSet);
messageSet.some(function(m) {
msg += m.message.value.toString('utf8') + " ";
if (parseInt(m.offset, 10) > parseInt(req.params.off, 10) + 10) {
return true;
}
});
res.send("Thank you " + size + " " + req.params.off + " " + msg);
});
});
app.listen(process.env.PORT);
在這方面的任何迴應表示讚賞。
我現在只有一個分區。忽略超出範圍的偏移量會使我的其他同時請求變慢。我不知道如何取消訂閱消費者。還有一點,如果兩個或兩個以上的消費者爲消費者API提供不同的偏移量,並且EventEmitter一個接一個地提供消息,那麼如何確定哪個消費者要求哪個消息? – LearningToCode
使用多個消費者 - 這意味着不止一個客戶。這是我能夠實現它的唯一方式。我使用的模式是這樣的:我對待流中的歷史記錄與將來的記錄不同。如果我想閱讀歷史數據,我站出來一個新的客戶+消費者,並研究我需要研究的內容。爲了建立一個監聽器來讀取未來的數據,我複用了監聽器 - 如果兩個消費者想要爲將來的請求傾聽同一主題,我讓他們使用同一個用戶。 –
你能推薦一些可以提供我的功能的代碼嗎?我在問題中顯示的代碼正常工作,但每個響應需要大約1秒。 – LearningToCode