2017-09-26 50 views
0

我期望通過StackExchange.Redis實現一個簡單的分佈式工作隊列系統。Redis - 簡單隊列讀寫器的正確方法 - StackExchange.Redis

我明白沒有BLPOP等的原因,但因爲它站在我工作的接口是基於重複的TryRead呼叫超時。

因爲我在處理程序中取消訂閱,並且設置了一個標誌來取消超時,所以我對下面的內容不甚瞭解。有沒有什麼機會可能會錯過?有沒有不同的方法來實現這一目標?

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     string result = null; 

     var chanName = $"qnot_{queueName}"; 
     var done = new ManualResetEvent(false); 

     void Handler(RedisChannel chan, RedisValue val) 
     { 
      _sub.Unsubscribe(chanName, Handler); 
      result = _database.ListRightPop($"qdata_{queueName}"); 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 
     done.WaitOne(timeout); 

     return result; 
    } 

    public void Write(string queueName, string text) 
    { 
     _database.ListLeftPush($"qdata_{queueName}", text); 
     _sub.Publish($"qnot_{queueName}", ""); 
    } 

以上版本總是超時,在有隊列現有項目(並沒有什麼新增加)的情況下返回null。下面的版本現在首先檢查現有的數據,這是有效的。但它有一個錯誤,一個競爭條件:如果第一次讀取檢查返回否定,那麼推送併發送通知,然後我們訂閱並等待超時。

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     var dataName = $"qdata_{queueName}"; 

     var result = (string)_database.ListRightPop(dataName); 
     if (result != null) 
     { 
      return result; 
     } 

     var chanName = $"qnot_{queueName}"; 
     var done = new ManualResetEvent(false); 

     void Handler(RedisChannel chan, RedisValue val) 
     { 
      _sub.Unsubscribe(chanName, Handler); 
      result = _database.ListRightPop(dataName); 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 
     done.WaitOne(timeout); 

     return result; 
    } 

我可以在一個循環做RPOP S,但似乎完全吸。其他人做了類似的事情?

+0

我很困惑,爲什麼你在處理時取消訂閱 - 當然讀者應該基本上是:讀一切?不只是一個? –

+0

我正在將其改造爲目前使用Azure和AWS隊列的系統,並且保持一致性,我希望堅持'TryRead'的相同簽名。它是獨立的,只是阻塞,直到超時@MarcGravell –

回答

0

我結束了這一點,這工作,但我仍然歡迎其他答案用一種可行的方法:

public string TryRead(string queueName, TimeSpan timeout) 
    { 
     var timer = Stopwatch.StartNew(); 
     var dataName = $"{_keyPrefix}qdata_{queueName}"; 
     var chanName = $"{_keyPrefix}qnot_{queueName}"; 
     var done = new AutoResetEvent(false); 
     string result; 

     // subscribe - sets the 'done' flag when a new item is pushed 
     void Handler(RedisChannel chan, RedisValue val) 
     { 
      done.Set(); 
     } 

     _sub.Subscribe(chanName, Handler); 

     do 
     { 
      // try to read right away (before waiting), in case there was data already there 
      result = _database.ListRightPop(dataName); 
      if (result != null) 
      { 
       continue; 
      } 

      // there wasn't an item right away, so wait for the timeout to expire 
      // or the subscription to be fired. if it fired, try the read again 
      var remainingTime = timeout - timer.Elapsed; 
      if (remainingTime.TotalMilliseconds <= 1.0) 
      { 
       break; 
      } 
      if (done.WaitOne(remainingTime)) 
      { 
       result = _database.ListRightPop(dataName); 
      } 
     } while (result == null && timer.Elapsed < timeout); 

     _sub.Unsubscribe(chanName, Handler); 

     return result; 
    } 

編輯:更新W/AutoResetEvent並從處理程序刪除Unsubscribe。請注意,對於那些發現這種情況的人來說,這似乎對我來說是一種替代單個阻塞式讀取的替代方法,但這不會是推薦的方法。我只使用這個,因爲我正在考慮與其他隊列實現保持一致,並且正在致力於此特定TryRead簽名。