2013-03-17 24 views
0

我一直在試圖找出如何解決的要求,我有,但對我的生活我只是不能拿出一個解決方案。去隊列中的項目與工作線程

我有一個項目數據庫,其中存儲他們的一種隊列。 (數據庫已經實現和其他進程將被添加項目到這個隊列。)

該項目需要大量的工作/時間「過程」,所以我需要能夠: 不斷去排隊來自數據庫的項目。 對於每個項目運行一個新線程並處理該項目,然後返回true/false它成功處理。 (這將用於重新將其添加到數據庫隊列中)

但是,只有在當前活動線程數(每個正在處理的項目數)少於最大線程數參數的情況下執行此操作。

一旦最大線程數已經達到我需要停下來解除隊列從數據庫項目,直到線程的當前數目小於線程的最大數量。 需要繼續排隊項目。

感覺就像這應該是我能想出但它只是沒有向我涌來。

澄清:我只需要實現線程。數據庫已經實施。

+1

有幾個部分對此,太多地址在一個職位。將任務分解爲更小的部分,然後編寫代碼,然後在遇到較小的部分時發佈更具體的問題。 – mbeckish 2013-03-17 01:12:03

+0

如果你的數據庫是MSSQL 2005或更新的,我建議尋找Service Broker。 – 2013-03-17 01:19:53

回答

6

一個非常簡單的方法是使用Semaphore。您有一個線程可將項目出列並創建線程來處理它們。例如:

const int MaxThreads = 4; 
Semaphore sem = new Semaphore(MaxThreads, MaxThreads); 
while (Queue.HasItems()) 
{ 
    sem.WaitOne(); 
    var item = Queue.Dequeue(); 
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below 
} 
// When the queue is empty, you have to wait for all processing 
// threads to complete. 
// If you can acquire the semaphore MaxThreads times, all workers are done 
int count = 0; 
while (count < MaxThreads) 
{ 
    sem.WaitOne(); 
    ++count; 
} 

// the code to process an item 
void ProcessItem(object item) 
{ 
    // cast the item to whatever type you need, 
    // and process it. 
    // when done processing, release the semaphore 
    sem.Release(); 
} 

上述技術工作得很好。編碼簡單,易於理解,非常有效。

一個變化是您可能想要使用Task API而不是Threadpool.QueueUserWorkItemTask可讓您更好地控制異步處理,包括取消。在我的例子中我使用了QueueUserWorkItem,因爲我更熟悉它。我會在生產程序中使用Task

雖然這確實使用N + 1個線程(其中N是要同時處理的項目數量),但該額外線程通常不會執行任何操作。它運行的唯一時間是將工作分配給工作線程的時間。否則,它會在信號量上進行非忙等待。

0

你只是不知道從哪裏開始?

考慮具有最大線程數的線程池。 http://msdn.microsoft.com/en-us/library/y5htx827.aspx

考慮立即旋轉起來線程的最大數量和監測數據庫。 http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx很方便。

請記住,你不能保證你的程序將被安全地結束...崩潰發生。考慮處理狀態的記錄。

記住,你的選擇,並從隊列中刪除的操作應該是原子的。

0

好了,所以該解決方案的體系結構將依賴於一兩件事:確實每隊列項的處理時間根據不同項目的數據?

如果沒有,那麼你可以擁有的東西,處理線程之間僅僅是圓知更鳥。這將很容易實現。

如果處理時間也有所不同,那麼你會需要更多的一個「下一個可用」的感覺它的東西,所以無論你的線程恰好是免費的第一被賦予處理數據項的工作。

工作過說出來,然後你將有怎樣一個隊列讀取器和處理線程之間同步的周圍通常運行。 'next-available'和'round-robin'之間的區別在於你如何進行同步。

我不是太熟悉C#,但我聽說獸告訴叫做後臺工作。這很可能是一個可以接受的方法。

對於循環賽,只啓動每個隊列項的後臺工作,在存儲陣列中的工人引用。只限於16位正在進步的後臺工作人員。這個想法是,你已經開始了16,然後等待第一個完成,直到17號開始,等等。我相信後臺工作者實際上是作爲線程池上的作業運行的,所以這會自動地限制實際運行的線程數量,以適應​​底層硬件的需求。要等待後臺工作人員,請參閱this。等待後臺工作人員完成後,您會處理結果並開始另一個工作。

對於下一個可用的方法,它沒有那麼不同。與其等待第一個完成,您將使用WaitAny()等待任何工作人員完成。你從任何一個完成處理返回,然後開始另一個回到WaitAny()。

這兩種方法的一般理念是保持一些線程一直在沸騰。下一個可用方法的特點是,您發出結果的順序不一定與輸入項目的順序相同。如果那麼重要,那麼比CPU核心擁有更多後臺工作人員的循環方法將會相當高效(線程池將會剛剛開始調試,但還沒有運行的工作人員)。然而,延遲會隨處理時間而變化。

BTW 16是根據您認爲在運行該軟件的PC上有多少核心來選擇的任意數字。更多的核心,更大的數量。

當然,在看似不安定且不斷變化的.NET世界中,現在可能有更好的方法來做到這一點。

祝你好運!