在我的node.js應用程序中,我從AWS Kinesis流中讀取消息,並且需要將最後一分鐘的所有消息存儲在緩存(Redis)中。我在一個節點上運行工人下面的代碼:在node.js中併發寫入到redis
var loopCallback = function(record) {
var nowMinute = moment.utc(record.Data.ts).minute();
//get all cached kinesis records
var key = "kinesis";
cache.get(key,function (err, cachedData) {
if (err) {
utils.logError(err);
} else {
if(!cachedData) {
cachedData = [];
} else {
cachedData = JSON.parse(cachedData);
}
//get records with the same minute
var filtered = _.filter(cachedData, function (item) {
return moment.utc(item.ts).minute() === nowMinute;
});
filtered.push(record.Data);
cache.set(key, JSON.stringify(filtered), function (saveErr) {
if (saveErr) {
utils.logError(saveErr);
}
//do other things with record;
});
}
});
};
大多數的記錄(數十個)我收到完全相同的同一時刻。所以當我嘗試保存它時,一些記錄不會被存儲。 我明白這是由於競賽條件而發生的。 節點讀取來自Redis的數組版本old
,並在將另一個記錄寫入緩存時覆蓋陣列。 我已經閱讀了關於redis交易的信息,但據我瞭解,它不會幫助我,因爲只有一個交易將完成,而其他交易將被拒絕。 在我的情況下,有辦法將所有記錄保存到緩存中? 謝謝
你爲什麼要存儲元素作爲Redis的字符串值(鍵= JSON.stringify(.. ))。例如,您應該使用RPUSH將它們存儲在重新列出的列表中。這將繞過你的併發問題。 –
謝謝。但我也需要刪除前一分鐘的數據,所以它意味着三個命令:LRANGE/DEL/RPUSH? – Ilya
如果你說「只保留N個元素」而不是「最後一分鐘」,會更容易。環顧這個工具應該給你想法https://github.com/tj/node-redis-histogram –