2014-04-03 31 views
2

我在訪問相同BlockingCollection的C#應用​​程序中使用了兩個線程。這工作正常,但我想檢索第一個值兩次,因此兩個線程檢索相同的值*。多線程BlockingCollection相同的值

幾秒鐘後,我想輪詢兩個線程的currentIndex並刪除每個值<索引。因此,例如,線程的最低currentIndex爲5,應用程序將刪除隊列中索引0-5處的項目。另一個解決方案是在所有線程處理該值時刪除隊列中的值。

我該如何做到這一點?我想我需要另一種類型的緩衝區..?

預先感謝您!

*如果.Take()被thread1調用,則該項目在集合中被刪除,而thread2不能再次獲得相同的項目。


更新:

我想數據存儲在緩衝器中,因此,例如線程1的數據保存到HDD和線程2分析(相同)的數據(並行)。

+0

你問的是非常不尋常的。我懷疑它說的是你的程序設計中的一個更大的問題。你能給我們多一點關於你的程序的信息 - 特別是數據流?我懷疑我們可以爲您提供更好的選擇。 –

+0

更新了第一篇文章。 – Odrai

回答

5

使用生產者 - 消費者將Value1添加到兩個單獨的ConcurrentQueues。讓線程出隊然後從他們自己的隊列中處理它們。

編輯7/4/14: 下面是一個朦朧,哈克和半想出來的解決方案:創建一個緩衝的自定義對象。它可以包含您嘗試在線程1中緩衝的信息以及線程2中分析結果的空間。

將對象添加到線程1和BlockingCollection中的緩衝區。使用線程2分析結果並用結果更新對象。阻塞集合不應該太大,因爲它只是處理引用不應該打你的記憶。這假定你不會在兩個線程上同時修改緩衝區中的信息。

另外,還有一半想出來的解決方案是將信息同時送入緩衝區和阻塞收集。分析來自BlockingCollection的數據,將其輸入到輸出集合中,並再次將它們與緩衝區進行匹配。如果你做對了,這個選項可以處理併發修改,但可能會有更多的工作。

我認爲選擇一個更好。正如我已經指出的那樣,這些只是一半形成的,但它們可能會幫助您找到適合您的特定需求的東西。祝你好運。

+0

謝謝你的回答。所以不可能有兩個線程共享的緩衝區並使用相同的值? – Odrai

+0

我並不是說它無法完成,但我無法想出一條脫離我頭頂的路。我也不確定它是否應該完成。一般而言,併發集合刪除元素以防止您嘗試執行的操作,因爲這通常不合需要。如果你真的需要破解可能將值添加到非併發鏈表並閱讀它們,暫停和刪除將起作用。但我認爲你會後悔走這條路。 – mike1952

+0

嗨Odrai,我的編輯幫助呢?我不確定堆棧溢出是否發送編輯警報,因此您可能沒有看到它。 – mike1952

1

我建議重新考慮你的設計。

當你有一個必須處理的項目列表,然後給每個線程一個他必須處理的項目隊列。

有了這樣的解決方案,給兩個或多個線程處理相同的值都不會有問題。

這樣的事情,沒有測試只是鍵入。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Collections.Concurrent; 

namespace ConsoleApplication2 
{ 

    class Item 
    { 
    private int _value; 
    public int Value 
    { 
     get 
     { 
     return _value; 
     } 
    } 

    // all you need 
    public Item(int i) 
    { 
     _value = i; 
    } 
    } 

    class WorkerParameters 
    { 
    public ConcurrentQueue<Item> Items = new ConcurrentQueue<Item>(); 
    } 

    class Worker 
    { 
    private Thread _thread; 
    private WorkerParameters _params = new WorkerParameters(); 

    public void EnqueueItem(Item item) 
    { 
     _params.Items.Enqueue(item); 
    } 

    public void Start() 
    { 
     _thread = new Thread(new ParameterizedThreadStart(ThreadProc)); 
     _thread.Start(); 
    } 

    public void Stop() 
    { 
     // build somthing to stop your thread 
    } 

    public static void ThreadProc(object threadParams) 
    { 
     WorkerParameters p = (WorkerParameters)threadParams; 
     while (true) 
     { 
     while (p.Items.Count > 0) 
     { 
      Item item = null; 
      p.Items.TryDequeue(out item); 

      if (item != null) 
      { 
      // do something 
      } 

     } 
     System.Threading.Thread.Sleep(50); 
     } 
    } 
    } 

    class Program 
    { 
    static void Main(string[] args) 
    { 

     Worker w1 = new Worker(); 
     Worker w2 = new Worker(); 
     w1.Start(); 
     w2.Start(); 

     List<Item> itemsToProcess = new List<Item>(); 
     for (int i = 1; i < 1000; i++) 
     { 
     itemsToProcess.Add(new Item(i)); 
     } 

     for (int i = 1; i < 1000; i++) 
     { 
     w1.EnqueueItem(itemsToProcess[i]); 
     w2.EnqueueItem(itemsToProcess[i]); 
     } 


    } 
    } 
} 
相關問題