我期望通過StackExchange.Redis實現一個簡單的分佈式工作隊列系統。Redis - 簡單隊列讀寫器的正確方法 - StackExchange.Redis
我明白沒有BLPOP
等的原因,但因爲它站在我工作的接口是基於重複的TryRead
呼叫超時。
因爲我在處理程序中取消訂閱,並且設置了一個標誌來取消超時,所以我對下面的內容不甚瞭解。有沒有什麼機會可能會錯過?有沒有不同的方法來實現這一目標?
public string TryRead(string queueName, TimeSpan timeout)
{
string result = null;
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop($"qdata_{queueName}");
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
public void Write(string queueName, string text)
{
_database.ListLeftPush($"qdata_{queueName}", text);
_sub.Publish($"qnot_{queueName}", "");
}
以上版本總是超時,在有隊列現有項目(並沒有什麼新增加)的情況下返回null
。下面的版本現在首先檢查現有的數據,這是有效的。但它有一個錯誤,一個競爭條件:如果第一次讀取檢查返回否定,那麼推送併發送通知,然後我們訂閱並等待超時。
public string TryRead(string queueName, TimeSpan timeout)
{
var dataName = $"qdata_{queueName}";
var result = (string)_database.ListRightPop(dataName);
if (result != null)
{
return result;
}
var chanName = $"qnot_{queueName}";
var done = new ManualResetEvent(false);
void Handler(RedisChannel chan, RedisValue val)
{
_sub.Unsubscribe(chanName, Handler);
result = _database.ListRightPop(dataName);
done.Set();
}
_sub.Subscribe(chanName, Handler);
done.WaitOne(timeout);
return result;
}
我可以在一個循環做RPOP
S,但似乎完全吸。其他人做了類似的事情?
我很困惑,爲什麼你在處理時取消訂閱 - 當然讀者應該基本上是:讀一切?不只是一個? –
我正在將其改造爲目前使用Azure和AWS隊列的系統,並且保持一致性,我希望堅持'TryRead'的相同簽名。它是獨立的,只是阻塞,直到超時@MarcGravell –