問題我需要解決的是(從我的理解)典型的生產者/消費者問題。我們有數據傳入24/7/365。傳入數據(稱爲原始數據)存儲在表中,並且對最終用戶不可用。然後我們選擇所有未處理的原始數據並逐一開始處理。處理完每個單元的數據後,它將存儲在另一個表中,並且現在已準備好供客戶端應用程序使用。 從加載原始數據到持續處理的數據的過程平均需要2 - 5秒。但它高度依賴於我們用來處理數據的第三方Web服務。如果Web服務速度很慢,我們不再像處理數據那樣快速處理數據,並積累積壓,從而導致我們的客戶丟失實時數據。 我們想讓這個過程成爲多線程的。從我的研究,我可以看到,該過程可以分爲三個分立部分:無限期運行生產者/消費者應用程序
LOADING - 一個Loader任務(生產者)運行無限期和負載從DB未處理數據
BlockingCollection<T>
(或其他一些變異併發收藏)。我選擇BlockingCollection
是因爲它考慮到生產者/消費者模式並提供GetConsumingEnumerable()
方法。PROCESSING - 消費上述
BlockingCollection<T>
的數據的多個消費者。在目前的實現中,我有一個Parallel.ForEach
循環,通過GetConsumingEnumerable()
,每次迭代都會啓動一個任務,並執行兩個任務延續:任務的第一步是調用第三方Web服務,等待結果並輸出第二個任務的結果消耗。第二個任務根據第一個任務的輸出進行計算,並輸出第三個任務的結果,第三個任務基本上只將結果存儲到第二個BlockingCollection<T>
(這是一個輸出集合)。所以我的消費者也是有效的生產者。理想情況下,任務1加載的每個數據單元將排隊等待處理。PERSISTING - 單個消費者運行上述第二個
BlockingCollection
並將處理後的數據保存到數據庫中。我現在面臨
問題是項目號碼2從上面的列表。它似乎不夠快(僅僅通過使用Parallel.ForEach
)。我嘗試了Parallel.ForEach
而不是直接開始一個任務,繼續,開始一個包裝線程,將依次啓動處理任務。但是這會導致OutOfMemory異常,因爲線程數量失控並且很快就會達到1200。我也嘗試使用ThreadPool無法安排工作。
您能否告訴我,如果我的方法足夠滿足我們需要完成的工作,或者有更好的方法嗎?
我建議使用'ConcurrentQueue',而不是'BlockingCollection '。似乎更適合你的情況。 –
BlockingCollection在使用默認構造函數實例化時使用了ConcurrentQueue,引入了更多功能... – Dimitri
啊。不知道! –