2

經過一番研究,我正在尋求任何有關如何有效地從Concurrent集合中刪除兩個項目的反饋。我的情況涉及通過UDP傳入的消息,這些消息當前正在放入BlockingCollection中。一旦集合中有兩個用戶,我需要安全地接收兩個用戶並處理它們。我見過幾種不同的技術,包括下面列出的一些想法。我目前的實施情況如下,但我認爲有一個更清晰的方法可以做到這一點,同時確保用戶按兩組進行處理。這是這種情況下唯一的限制。.Net BlockingCollection.Take(2):一次安全刪除兩個項目

實施現狀:

private int userQueueCount = 0; 
    public BlockingCollection<User> UserQueue = new BlockingCollection<User>(); 

    public void JoinQueue(User u) 
    { 
      UserQueue.Add(u); 
      Interlocked.Increment(ref userQueueCount); 

      if (userQueueCount > 1) 
      { 
       IEnumerable<User> users = UserQueue.Take(2); 
       if(users.Count==2) { 
       Interlocked.Decrement(ref userQueueCount); 
       Interlocked.Decrement(ref userQueueCount); 
       ... do some work with users but if only one 
       is removed I'll run into problems 
       } 

      } 
    } 

我想要做的就是這樣的事情,但我目前不能測試這在生產形勢,以確保其完整性。

Parallel.ForEach(UserQueue.Take(2), (u) => { ... }); 

或者更好的是:

public void JoinQueue(User u) 
    { 
      UserQueue.Add(u); 
      // if needed? increment 
      Interlocked.Increment(ref userQueueCount); 
      UserQueue.CompleteAdding(); 
    } 

然後實現這個地方:

 Task.Factory.StartNew(() => 
     { 
      while (userQueueCount > 1) OR (UserQueue.Count > 1) If it's safe? 
      { 
       IEnumerable<User> users = UserQueue.Take(2); 
       ... do stuff 
      } 

     }); 

這樣做的問題是,我不知道我可以保證,條件之間(計> 1)和Take(2),我確保UserQueue至少有兩個項目要處理?傳入的UDP消息是並行處理的,所以我需要一種方法來安全地從兩個成對的Blocking/Concurrent Collection中取出項目。

有沒有更好/更安全的方法來做到這一點?

修評論: 這個問題的intented目標實際上只是在.NET 4.0中實現的加工項目穩定/線程安全的方法掀起了併發收集的。它不一定非常漂亮,它只需要在並行環境中的無序對中處理項目時保持穩定。

+1

有什麼阻止你將它們擺在首位添加爲元組? – Brunner

+0

你的代碼*不太可能*是線程安全的 - 對於許多單獨的同步調用,所以很難證明它的行爲正確。 –

+2

'Take(2)'正在調用LINQ方法,該方法不會將它們從BlockingCollection中移除。你必須按照他們添加的順序將它們配對嗎?例如,如果集合有a,b,c,d可以由一個線程處理a&c和另一個處理b&d,或者您必須處理&b然後c&d? –

回答

2

以下是我在波濤洶涌的代碼做:

ConcurrentQueuequeue = new ConcurrentQueue(); //can use a BlockingCollection too (as it's just a blocking ConcurrentQueue by default anyway) 

public void OnUserStartedGame(User joiningUser) 
{ 
    User waitingUser; 
    if (this.gameQueue.TryDequeue(out waitingUser)) //if there's someone waiting, we'll get him 
     this.MatchUsers(waitingUser, joiningUser); 
    else 
     this.QueueUser(joiningUser); //it doesn't matter if there's already someone in the queue by now because, well, we are using a queue and it will sort itself out. 
} 

private void QueueUser(User user) 
{ 
    this.gameQueue.Enqueue(user); 
} 

private void MatchUsers(User first, User second) 
{ 
    //not sure what you do here 
} 

的基本想法是,如果某人的希望開始比賽,有一個人在你的隊列,你與它們匹配,並開始遊戲 - 如果沒有人,將它們添加到隊列中。 一次最多隻能有一個用戶在隊列中,但如果不是這樣,那也不算太壞,因爲當其他用戶開始遊戲時,等待的用戶會逐漸刪除,直到隊列被添加再次空着。

+0

我認爲這在邏輯上是最好的方法來處理它。謝謝! –

+0

這段代碼有一個競爭條件,這意味着它並不總是FIFO。如果隊列爲空,並且兩個或兩個以上的用戶同時開始遊戲,則它們可能全部進入隊列,並且只有更多用戶加入時纔會匹配用戶。 – svick

+0

@svick我不會說這是一個競爭條件,這不是一個致命的缺陷 - 我也在答覆中提到過兩次。我不認爲這是一個問題的原因是,只要有足夠的人開始遊戲,隊列就會自動清空。此外,這種衝突的時間範圍非常小,發生這種情況的可能性非常渺茫,因爲唯一的情況很重要(不是很多用戶開始遊戲)。我不確定你的意思是不是FIFO,它是一個隊列,如果它們是「同時」添加的,那麼如果它們是第一或第二,它就不會太重要。 – Brunner

1

如果由於某種原因無法將用戶對添加到集合中,那麼我會使用ConcurrentQueue並嘗試一次嘗試排列2個項目,如果我只能獲得一個 - 則將其放回。根據需要等待。

+0

我最初在ConcurrentQueue中有這個,但是在做了一些閱讀之後就切換了。我認爲使用BlockingCollection的想法是試圖以某種方式實現消費者/製作者。我認爲沒有一種明顯簡單的方法來實現這一點,並且實際上認爲你的建議將在此期間起作用。謝謝你的幫助! –

1

我覺得這裏最簡單的解決方法是使用鎖:你會爲所有消費者(生產者將不使用任何鎖),這將確保你始終以用戶以正確的順序一個鎖:

User firstUser; 
User secondUser; 

lock (consumerLock) 
{ 
    firstUser = userQueue.Take(); 
    secondUser = userQueue.Take(); 
} 

Process(firstUser, secondUser); 

另一種選擇是擁有兩個隊列:一個用於單個用戶,一個用於成對用戶,並有一個將它們從第一個隊列傳輸到第二個隊列的進程。

如果你不介意浪費另一個線程,你可以用兩個BlockingCollection就做這樣的:

while (true) 
{ 
    var firstUser = incomingUsers.Take(); 
    var secondUser = incomingUsers.Take(); 

    userPairs.Add(Tuple.Create(firstUser, secondUser)); 
} 

您不必擔心鎖定在這裏,因爲單用戶隊列會只有一個消費者,並且對的消費者現在可以安全地使用簡單的Take()

如果你關心浪費一個線程,並且可以使用TPL數據流,你可以使用BatchBlock<T>,它結合了進來物品進入的ñ項目,其中ñ在創建塊的時間配置批次,這樣你就可以將其設置爲2

0

願這可以helpd

public static IList<T> TakeMulti<T>(this BlockingCollection<T> me, int count = 100) where T : class 
{ 
    T last = null; 
    if (me.Count == 0) 
    { 
     last = me.Take(); // blocking when queue is empty 
    } 

    var result = new List<T>(count); 

    if (last != null) 
    { 
     result.Add(last); 
    } 

    //if you want to take more item on this time. 
    //if (me.Count < count/2) 
    //{ 
    // Thread.Sleep(1000); 
    //} 

    while (me.Count > 0 && result.Count <= count) 
    { 
     result.Add(me.Take()); 
    } 

    return result; 
}