2012-05-03 74 views
2

我試圖通過使用RentrantLock阻止異步嘗試修改實體的某個實例(由關鍵字標識)來實現一個類,以在我的java應用程序中實施併發。目標是阻止/排隊多個併發嘗試來修改對象的給定實例,直到先前的線程完成。這個類以一種通用的方式實現它,允許任何代碼塊獲得一個鎖,並在完成後釋放它(與RentrantLock語義相同),並增加了只阻塞試圖修改對象實例的線程的實用程序通過一個鍵)而不是阻止所有進入代碼塊的線程。使用ReentrantLock實現阻塞併發

該類提供了一個簡單的構造,允許僅爲實體的一個實例同步代碼塊。例如,如果我想要爲來自用戶的所有線程同步代碼塊,並使用ID 33,但我不希望任何其他用戶的線程被線程服務用戶33阻塞。

該類實現爲如下

public class EntitySynchronizer { 
    private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes 
    private Object mutex = new Object(); 
    private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>(); 
    private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>(); 
    private int maximumLockDurationSeconds; 
    public EntitySynchronizer() { 
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS); 
    } 
    public EntitySynchronizer(int maximumLockDurationSeconds) { 
    this.maximumLockDurationSeconds = maximumLockDurationSeconds; 
    } 
    /** 
    * Initiate a lock for all threads with this key value 
    * @param key the instance identifier for concurrency synchronization 
    */ 
    public void lock(Object key) { 
    if (key == null) { 
     return; 
    } 
    /* 
    * returns the existing lock for specified key, or null if there was no existing lock for the 
    * key 
    */ 
    ReentrantLock lock; 
    synchronized (mutex) { 
     lock = locks.putIfAbsent(key, new ReentrantLock(true)); 
     if (lock == null) { 
     lock = locks.get(key); 
     } 
    } 
    /* 
    * Acquires the lock and returns immediately with the value true if it is not held by another 
    * thread within the given waiting time and the current thread has not been interrupted. If this 
    * lock has been set to use a fair ordering policy then an available lock will NOT be acquired 
    * if any other threads are waiting for the lock. If the current thread already holds this lock 
    * then the hold count is incremented by one and the method returns true. If the lock is held by 
    * another thread then the current thread becomes disabled for thread scheduling purposes and 
    * lies dormant until one of three things happens: - The lock is acquired by the current thread; 
    * or - Some other thread interrupts the current thread; or - The specified waiting time elapses 
    */ 
    try { 
     /* 
     * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired 
     * lock is not released normalRelease will be false if the lock was released because the 
     * timeout expired 
     */ 
     boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS); 
     /* 
     * lock was release because timeout expired. We do not want to proceed, we should throw a 
     * concurrency exception for waiting thread 
     */ 
     if (!normalRelease) { 
     throw new ConcurrentModificationException(
      "Entity synchronization concurrency lock timeout expired for item key: " + key); 
     } 
    } catch (InterruptedException e) { 
     throw new IllegalStateException("Entity synchronization interrupted exception for item key: " 
      + key); 
    } 
    keyThreadLocal.set(key); 
    } 
    /** 
    * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with 
    * the same entity key 
    */ 
    public void unlock() { 
    Object key = keyThreadLocal.get(); 
    keyThreadLocal.remove(); 
    if (key != null) { 
     ReentrantLock lock = locks.get(key); 
     if (lock != null) { 
     try { 
      synchronized (mutex) { 
      if (!lock.hasQueuedThreads()) { 
       locks.remove(key); 
      } 
      } 
     } finally { 
      lock.unlock(); 
     } 
     } else { 
     synchronized (mutex) { 
      locks.remove(key); 
     } 
     } 
    } 
    } 
} 

這個類的使用方法如下:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer(); 
entitySynchronizer.lock(userId); // 'user' is the entity by which i want to differentiate my synchronization 
try { 
    //execute my code here ... 
} finally { 
    entitySynchronizer.unlock(); 
} 

的問題是,它不能很好地工作。在非常高的併發負載下,仍然存在一些情況,其中具有相同密鑰的多個線程未被同步。我已經完成了相當徹底的測試,並且無法弄清楚爲什麼會發生這種情況。

那裏有任何併發​​專家?

+0

我想你對所有訪問都使用'entitySynchronizer'的同一個實例嗎? – assylias

回答

1

您的解決方案缺乏原子性。考慮以下情況:

  • 線程A輸入lock()並從地圖獲取現有鎖。
  • 線程B爲同一個鍵輸入unlock(),解鎖並從地圖上移除鎖(因爲線程A尚未調用tryLock())。
  • 線程A成功呼叫tryLock()

一種可能的選擇是跟蹤鎖的數量從地圖上「簽出」:

public class EntitySynchronizer { 
    private Map<Object, Token> tokens = new HashMap<Object, Token>(); 
    private ThreadLocal<Token> currentToken = new ThreadLocal<Token>(); 
    private Object mutex = new Object(); 

    ... 

    public void lock(Object key) throws InterruptedException { 
     Token token = checkOut(key); 
     boolean locked = false; 
     try { 
      locked = token.lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS)); 
      if (locked) currentToken.set(token); 
     } finally { 
      if (!locked) checkIn(token); 
     } 
    } 

    public void unlock() { 
     Token token = currentToken.get(); 
     if (token != null) { 
      token.lock.unlock(); 
      checkIn(token); 
      currentToken.remove(); 
     } 
    } 

    private Token checkOut(Object key) { 
     synchronized (mutex) { 
      Token token = tokens.get(key); 
      if (token != null) { 
       token.checkedOutCount++; 
      } else { 
       token = new Token(key); 
       tokens.put(key, token); 
      } 
      return token; 
     } 
    } 

    private void checkIn(Token token) { 
     synchronized (mutex) { 
      token.checkedOutCount--; 
      if (token.checkedOutCount == 0) 
       tokens.remove(token.key); 
     } 
    } 

    private class Token { 
     public final Object key; 
     public final ReentrantLock lock = new ReentrantLock(); 
     public int checkedOutCount = 1; 

     public Token(Object key) { 
      this.key = key; 
     } 
    } 
} 

注意tokens不必是ConcurentHashMap因爲它的方法被稱爲同步塊無論如何。

+1

他可以使用'synchronized(key)',但可能他希望所有'equal'的鍵互斥。 –

+0

@axtavt - 在lock()方法中,如果tryLock()返回false(由於鎖定持續時間超時),那麼使用它的代碼塊仍然會被正確執行(checkIn()在finally塊中調用) ,但是unlock()方法(因此checkIn())將在代碼塊的末尾被再次調用。 另外,我可以通過在我的同步塊中包含我的tryLock()來實現你在做什麼嗎? – Strykker

+0

@Strykker:如果'tryLock'返回'false','token'不會被放入'ThreadLocal',因此後續的'unlock()'什麼都不會做。如果需要,您可以針對這些案例實施不同的處理。您不能將'tryLock()'放入同步塊中,因爲它會阻塞其他線程。 – axtavt

4

你們中的一個應該解決的事情是這樣的:

ReentrantLock lock; 
synchronized (mutex) { 
    lock = locks.putIfAbsent(key, new ReentrantLock(true)); 
    if (lock == null) { 
    lock = locks.get(key); 
    } 
} 

這錯過了併發圖的整點。你爲什麼不寫這樣的:

ReentrantLock lock = new ReentrantLock(true); 
final ReentrantLock oldLock = locks.putIfAbsent(key, lock); 
lock = oldLock != null? oldLock : lock; 
+1

這並不完全正確,因爲即使在地圖中已經有該鎖的鎖,您也將始終獲得該鎖的新實例。海報真正需要的只是 'lock = locks.putIfAbsent(key,new ReentrantLock(true));' 參見(http://docs.oracle.com/javase/6/docs/api/java/util /concurrent/ConcurrentHashMap.html#putIfAbsent%28K,%20V%29) –

+0

馬爾科姆是對的,因爲我需要重用現有的鎖(如果存在的話)。 Marko,你想知道我爲什麼要同步圍繞Map的操作嗎?我這樣做是因爲我懷疑在創建/獲取現有的鎖和解鎖(方法)中的鎖管理代碼之間可能存在競爭條件@Malcom - 因此在您的代碼片段中,new()操作只有在沒有鎖的情況下才會被調用? – Strykker

+0

@MalcolmSmith不,OP也不希望你的建議。關於我的意思,看看我編輯的答案。 OP,我認爲你可以重寫代碼,以便併發映射上的所有操作都是原子的。順便說一句,馬爾科姆的代碼和我無條件地實例化鎖,但如果你馬上處理它,這不是問題。如果你想要原子'putIfAbsent',你必須這樣做。 –

1

我假設你沒有真正使用你的類,如下所示:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer(); 
entitySynchronizer.lock(userId); // 'user' is the entity by which i want to differentiate my synchronization 
try { 
    //execute my code here ... 
} finally { 
    entitySynchronizer.unlock(); 
} 

但有EntitySynchronizer的單一實例?因爲否則這就是你的問題。

+0

+1好。我甚至沒有看他的用法。 –

+0

正確,我確實有一個EntitySynchronizer的單例實例。我的用法示例有點過分簡化 – Strykker