2012-08-30 24 views
0

問題我需要解決的是(從我的理解)典型的生產者/消費者問題。我們有數據傳入24/7/365。傳入數據(稱爲原始數據)存儲在表中,並且對最終用戶不可用。然後我們選擇所有未處理的原始數據並逐一開始處理。處理完每個單元的數據後,它將存儲在另一個表中,並且現在已準備好供客戶端應用程序使用。 從加載原始數據到持續處理的數據的過程平均需要2 - 5秒。但它高度依賴於我們用來處理數據的第三方Web服務。如果Web服務速度很慢,我們不再像處理數據那樣快速處理數據,並積累積壓,從而導致我們的客戶丟失實時數據。 我們想讓這個過程成爲多線程的。從我的研究,我可以看到,該過程可以分爲三個分立部分:無限期運行生產者/消費者應用程序

  1. LOADING - 一個Loader任務(生產者)運行無限期和負載從DB未處理數據BlockingCollection<T>(或其他一些變異併發收藏)。我選擇BlockingCollection是因爲它考慮到生產者/消費者模式並提供GetConsumingEnumerable()方法。

  2. PROCESSING - 消費上述BlockingCollection<T>的數據的多個消費者。在目前的實現中,我有一個Parallel.ForEach循環,通過GetConsumingEnumerable(),每次迭代都會啓動一個任務,並執行兩個任務延續:任務的第一步是調用第三方Web服務,等待結果並輸出第二個任務的結果消耗。第二個任務根據第一個任務的輸出進行計算,並輸出第三個任務的結果,第三個任務基本上只將結果存儲到第二個BlockingCollection<T>(這是一個輸出集合)。所以我的消費者也是有效的生產者。理想情況下,任務1加載的每個數據單元將排隊等待處理。

  3. PERSISTING - 單個消費者運行上述第二個BlockingCollection並將處理後的數據保存到數據庫中。我現在面臨

問題是項目號碼2從上面的列表。它似乎不夠快(僅僅通過使用Parallel.ForEach)。我嘗試了Parallel.ForEach而不是直接開始一個任務,繼續,開始一個包裝線程,將依次啓動處理任務。但是這會導致OutOfMemory異常,因爲線程數量失控並且很快就會達到1200。我也嘗試使用ThreadPool無法安排工作。

您能否告訴我,如果我的方法足夠滿足我們需要完成的工作,或者有更好的方法嗎?

+0

我建議使用'ConcurrentQueue ',而不是'BlockingCollection '。似乎更適合你的情況。 –

+0

BlockingCollection在使用默認構造函數實例化時使用了ConcurrentQueue,引入了更多功能... – Dimitri

+0

啊。不知道! –

回答

3

如果瓶頸是一些第三方服務,這不會處理並行執行,但會排隊您的請求,那麼你不能做的事情。

但首先,你可以試試這個:

  • 使用線程池或任務(那些會使用線程池太) - 不要自己火起來線程
  • 試着讓你的異步請求,而不是使用線程專門
  • 通過性能分析器運行的服務/應用程序,並檢查要「浪費」自己的時間
  • 使穗/檢查第三方服務,看看它是如何處理的並行請求
  • 考慮緩存來自此服務的答案(如果可能)

這就是我現在所能想到的,沒有更多信息。

+0

有沒有辦法將所有加載的數據單元同時排隊處理?即每單位數據有一個線程/任務。在每次迭代中,我將永遠不會加載超過300-400個數據單元。因此,啓動300-400個並行任務將是我正在尋找的。 Parallel.ForEach沒有這樣做。 ThreadPool也沒有做到這一點。並且手動啓動線程,我用盡了它們:) – Dimitri

+1

(除了非常特殊的邊緣情況),你應該讓ThreadPool處理它可以讓你運行多少個任務 - 恕我直言,讓300個任務同時運行是沒有意義的時間(由於不斷的任務切換,這會降低你的工作速度) - 你使用的第三方服務的時間點對我來說是一個指示獲得更好性能的最佳方式。而且可擴展性是通過ASYNC(通過服務和所有你能想到的 - 比如你的數據庫操作等等) – Carsten

+0

謝謝。嘗試異步。我有點不惜一切代價避免異步,因爲每一步數據都經過高度依賴於先前的步驟,並且我不想等待異步來完成處理。但現在我想到了,我還在等待完成。 – Dimitri

2

我最近遇到這是非常類似你這樣的問題, 這裏就是我所做的,希望它可以幫助:

  1. 這似乎是你的第一和第三部分是相當簡單的,並且可以 第二部分必須先在一個新的線程上啓動,然後使用System.Threading.timer,使你的web服務調用 調用web服務的方法通過通過異步調用對處理方法的響應(結果)並讓它​​處理它自己的數據王牌,

這個解決我的問題,我希望它可以幫助你,如果有任何疑問問我,我會在這裏解釋一下......

+0

爲什麼會你需要一個計時器嗎? – Carsten

+0

我認爲他需要定期與web服務連接..不是嗎? O.o –

+0

Parallel.ForEach反對BlockingCollection的ConsumingEnumerable只要有集合中的項目就會永遠運行。不需要定時器 – Dimitri