2016-01-08 126 views
0

在我的node.js應用程序中,我從AWS Kinesis流中讀取消息,並且需要將最後一分鐘的所有消息存儲在緩存(Redis)中。我在一個節點上運行工人下面的代碼:在node.js中併發寫入到redis

var loopCallback = function(record) { 
    var nowMinute = moment.utc(record.Data.ts).minute(); 
    //get all cached kinesis records 
    var key = "kinesis"; 
    cache.get(key,function (err, cachedData) { 
     if (err) { 
      utils.logError(err); 
     } else { 

      if(!cachedData) { 
       cachedData = []; 
      } else { 
       cachedData = JSON.parse(cachedData); 
      } 

      //get records with the same minute 
      var filtered = _.filter(cachedData, function (item) { 
       return moment.utc(item.ts).minute() === nowMinute; 
      }); 

      filtered.push(record.Data); 

      cache.set(key, JSON.stringify(filtered), function (saveErr) { 
       if (saveErr) { 
        utils.logError(saveErr); 
       } 

       //do other things with record; 
      }); 
     } 
    }); 
}; 

大多數的記錄(數十個)我收到完全相同的同一時刻。所以當我嘗試保存它時,一些記錄不會被存儲。 我明白這是由於競賽條件而發生的。 節點讀取來自Redis的數組版本old,並在將另一個記錄寫入緩存時覆蓋陣列。 我已經閱讀了關於redis交易的信息,但據我瞭解,它不會幫助我,因爲只有一個交易將完成,而其他交易將被拒絕。 在我的情況下,有辦法將所有記錄保存到緩存中? 謝謝

+0

你爲什麼要存儲元素作爲Redis的字符串值(鍵= JSON.stringify(.. ))。例如,您應該使用RPUSH將它們存儲在重新列出的列表中。這將繞過你的併發問題。 –

+0

謝謝。但我也需要刪除前一分鐘的數據,所以它意味着三個命令:LRANGE/DEL/RPUSH? – Ilya

+0

如果你說「只保留N個元素」而不是「最後一分鐘」,會更容易。環顧這個工具應該給你想法https://github.com/tj/node-redis-histogram –

回答

0

你可以使用一個有序集合,比分是Unix時間戳 ZADD kinesis <unixtimestamp> "some data to be cached"

要獲得元素的加入不到一分鐘前,創建(現在 - 60秒)的時間戳,然後使用ZRANGEBYSCORE得到最古老的元素第一: ZRANGEBYSCORE myzset -inf (timestamp

ZREVRANGEBYSCORE如果你想要最新的元素第一: ZRANGEBYSCORE myzset -inf (timestamp

要刪除的元素年長超過一分鐘,創建(現在 - 60秒)時間戳然後用ZREMRANGEBYSCORE ZREMRANGEBYSCORE myzset -inf (timestamp