2014-10-02 36 views
3

我試圖使用ServiceStack-Redis庫和described here提供的鎖定機制來實現DLM,但是我發現API似乎會呈現競爭條件,有時會授予鎖定多個客戶端。使用ServiceStack Redis進行分佈式鎖定的互斥體違規

BasicRedisClientManager mgr = new BasicRedisClientManager(redisConnStr); 

using(var client = mgr.GetClient()) 
{ 
    client.Remove("touchcount"); 
    client.Increment("touchcount", 0); 
} 

Random rng = new Random(); 

Action<object> simulatedDistributedClientCode = (clientId) => { 

    using(var redisClient = mgr.GetClient()) 
    { 
     using(var mylock = redisClient.AcquireLock("mutex", TimeSpan.FromSeconds(2))) 
     { 
      long touches = redisClient.Get<long>("touchcount"); 
      Debug.WriteLine("client{0}: I acquired the lock! (touched: {1}x)", clientId, touches); 
      if(touches > 0) { 
       Debug.WriteLine("client{0}: Oh, but I see you've already been here. I'll release it.", clientId); 
       return; 
      } 
      int arbitraryDurationOfExecutingCode = rng.Next(100, 2500); 
      Thread.Sleep(arbitraryDurationOfExecutingCode); // do some work of arbitrary duration 
      redisClient.Increment("touchcount", 1); 
     } 
     Debug.WriteLine("client{0}: Okay, I released my lock, your turn now.", clientId); 
    } 
}; 
Action<Task> exceptionWriter = (t) => {if(t.IsFaulted) Debug.WriteLine(t.Exception.InnerExceptions.First());}; 

int arbitraryDelayBetweenClients = rng.Next(5, 500); 
var clientWorker1 = new Task(simulatedDistributedClientCode, 1); 
var clientWorker2 = new Task(simulatedDistributedClientCode, 2); 

clientWorker1.Start(); 
Thread.Sleep(arbitraryDelayBetweenClients); 
clientWorker2.Start(); 

Task.WaitAll(
    clientWorker1.ContinueWith(exceptionWriter), 
    clientWorker2.ContinueWith(exceptionWriter) 
    ); 

using(var client = mgr.GetClient()) 
{ 
    var finaltouch = client.Get<long>("touchcount"); 
    Console.WriteLine("Touched a total of {0}x.", finaltouch); 
} 

mgr.Dispose(); 

當運行上述代碼來模擬兩個客戶端試圖彼此的短連續內相同的操作,有三種可能的輸出。第一個是互斥工作正常並且客戶按正確順序進行的最佳情況。第二種情況是第二個客戶端超時等待獲取鎖定;也是可以接受的結果。然而,問題是,當arbitraryDurationOfExecutingCode接近或超過獲取鎖定的超時時間時,很容易重現第二個客戶端在第一個客戶端發佈之前被授予鎖定的情形,產生如下輸出:

client1:我獲得了鎖定! (感動:0x)
client2:我獲得了鎖! (感動:0x)
client1:好的,我釋放了我的鎖,現在輪到你了。 client2:好的,我釋放了我的鎖,現在輪到你了。
共觸摸了2次。

我對API的理解及其文檔是獲取鎖時timeOut說法,就是要這樣 - 對超時越來越鎖。如果我不得不猜測timeOut的值足夠高,以致總是比我執行的代碼的持續時間長,以防止出現這種情況,這似乎很容易出錯。除了傳遞null來永遠等待鎖之外,是否有人有其他解決方法?我絕對不想那樣做,或者我知道我會找到來自墜毀工人的鬼鎖。

回答

3

從mythz答案(感謝以便及時響應!)確認ServiceStack.Redis中內置的AcquireLock方法不會區分鎖購買期間與鎖到期期間。出於我們的目的,我們有現有的代碼,期望分佈式鎖定機制在發生鎖定時快速失敗,但允許鎖定範圍內的長時間運行的進程。爲了適應這些要求,我在ServiceStack RedisLock上推導了這種變體,以便區分兩者。

// based on ServiceStack.Redis.RedisLock 
// https://github.com/ServiceStack/ServiceStack.Redis/blob/master/src/ServiceStack.Redis/RedisLock.cs 
internal class RedisDlmLock : IDisposable 
{ 
    public static readonly TimeSpan DefaultLockAcquisitionTimeout = TimeSpan.FromSeconds(30); 
    public static readonly TimeSpan DefaultLockMaxAge = TimeSpan.FromHours(2); 
    public const string LockPrefix = ""; // namespace lock keys if desired 

    private readonly IRedisClient _client; // note that the held reference to client means lock scope should always be within client scope 

    private readonly string _lockKey; 
    private string _lockValue; 

    /// <summary> 
    /// Acquires a distributed lock on the specified key. 
    /// </summary> 
    /// <param name="redisClient">The client to use to acquire the lock.</param> 
    /// <param name="key">The key to acquire the lock on.</param> 
    /// <param name="acquisitionTimeOut">The amount of time to wait while trying to acquire the lock. Defaults to <see cref="DefaultLockAcquisitionTimeout"/>.</param> 
    /// <param name="lockMaxAge">After this amount of time expires, the lock will be invalidated and other clients will be allowed to establish a new lock on the same key. Deafults to <see cref="DefaultLockMaxAge"/>.</param> 
    public RedisDlmLock(IRedisClient redisClient, string key, TimeSpan? acquisitionTimeOut = null, TimeSpan? lockMaxAge = null) 
    { 
     _client = redisClient; 
     _lockKey = LockPrefix + key; 

     ExecExtensions.RetryUntilTrue(
      () => 
      { 
       //Modified from ServiceStack.Redis.RedisLock 
       //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx 
       //Calculate a unix time for when the lock should expire 

       lockMaxAge = lockMaxAge ?? DefaultLockMaxAge; // hold the lock for the default amount of time if not specified. 
       DateTime expireTime = DateTime.UtcNow.Add(lockMaxAge.Value); 
       _lockValue = (expireTime.ToUnixTimeMs() + 1).ToString(CultureInfo.InvariantCulture); 

       //Try to set the lock, if it does not exist this will succeed and the lock is obtained 
       var nx = redisClient.SetEntryIfNotExists(_lockKey, _lockValue); 
       if (nx) 
        return true; 

       //If we've gotten here then a key for the lock is present. This could be because the lock is 
       //correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly). 
       //Therefore we need to get the value of the lock to see when it should expire 
       string existingLockValue = redisClient.Get<string>(_lockKey); 
       long lockExpireTime; 
       if (!long.TryParse(existingLockValue, out lockExpireTime)) 
        return false; 
       //If the expire time is greater than the current time then we can't let the lock go yet 
       if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs()) 
        return false; 

       //If the expire time is less than the current time then it wasn't released properly and we can attempt to 
       //acquire the lock. This is done by setting the lock to our timeout string AND checking to make sure 
       //that what is returned is the old timeout string in order to account for a possible race condition. 
       return redisClient.GetAndSetEntry(_lockKey, _lockValue) == existingLockValue; 
      }, 
      acquisitionTimeOut ?? DefaultLockAcquisitionTimeout // loop attempting to get the lock for this amount of time. 
      ); 
    } 

    public override string ToString() 
    { 
     return String.Format("RedisDlmLock:{0}:{1}", _lockKey, _lockValue); 
    } 

    public void Dispose() 
    { 
     try 
     { 
      // only remove the entry if it still contains OUR value 
      _client.Watch(_lockKey); 
      var currentValue = _client.Get<string>(_lockKey); 
      if (currentValue != _lockValue) 
      { 
       _client.UnWatch(); 
       return; 
      } 

      using (var tx = _client.CreateTransaction()) 
      { 
       tx.QueueCommand(r => r.Remove(_lockKey)); 
       tx.Commit(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // log but don't throw 
     } 
    } 
} 

爲了簡化使用盡可能多的,我也暴露了一些擴展方法爲IRedisClient平行的AcquireLock方法,沿着這些線路:

internal static class RedisClientLockExtensions 
{ 
    public static IDisposable AcquireDlmLock(this IRedisClient client, string key, TimeSpan timeOut, TimeSpan maxAge) 
    { 
     return new RedisDlmLock(client, key, timeOut, maxAge); 
    } 
} 
2

您的問題突出顯示了ServiceStack.Redis中分佈式鎖定的行爲,如果超過指定的超時時間,超時客戶端將其視爲無效鎖並嘗試自動恢復該鎖。如果沒有自動恢復行爲,崩潰的客戶端永遠不會釋放該鎖,並且不會允許通過該鎖等待的進一步操作。

AcquireLock鎖定行爲是encapsulated in the RedisLock class

public IDisposable AcquireLock(string key, TimeSpan timeOut) 
{ 
    return new RedisLock(this, key, timeOut); 
} 

,您可以採取的複製和修改,以適應行爲,你寧願:

using (new MyRedisLock(client, key, timeout)) 
{ 
    //... 
} 
+0

謝謝你,這證實了我所看到的,那'timeOut'參數被用作鎖定獲取超時和鎖定期限。 – Dusty 2014-10-07 16:44:50