0

我的任務是優化線性數據處理例程的性能。以下是已有內容的概述:實時批處理數據處理

數據進入UDP端口,我們有多個偵聽器在不同的端口上偵聽,並將原始數據寫入SQL Server數據庫(讓我們稱之爲RawData)。然後,我們有一個單線程線性應用程序的多個實例,它從RawData表中抓取原始數據並處理單個數據行。處理的意思是將原始數據與給定實體的先前接收的數據進行比較,計算完成以計算不同讀數的數量,然後針對每個單獨的數據行調用幾個Web服務,最後爲每個數據添加新記錄在ProcessedData表中的行。另外對應的實體記錄在其他表中更新。

我看到問題的方式,它可以分解成更小的部分,我可以利用生產者/消費者模式進行數據處理: 生產者的一個線程填充共享(阻塞)隊列,多個消費者從中獲取數據行隊列並對它們進行並行處理。消費者完成後,他們將處理後的數據放到另一個共享隊列中,然後再由另一個消費者線程(單個)訪問,這個線程將執行SqlBulkCopy來插入新記錄。沿着這個過程,將會有其他共享隊列存儲更新的實體信息,而另一個消費者將獲取實體的更新信息並執行更新。

問題是,即使它看起來很簡單,但它在我看來是一個麻煩的方法。我確實有一種更好的方式來做我想找的事情。有關實施上述生產者/消費者模式的任何建議?或者我應該爲我的問題尋找不同的設計模式?

在此先感謝

+0

當你說「共享查詢」,你的意思是「共享隊列」? – 2011-04-11 19:47:36

+0

是的,我的不好。感謝您的注意。更正 – Dimitri 2011-04-11 19:53:21

回答

2

你提出的解決方案聽起來很合理,我不認爲這是很麻煩的。理解簡單,易於實施,有效且高效。它還允許您調整生產者和消費者的數量以實現最佳性能。分解成較小的部分,部件之間的通信有限,這是一件非常好的事情。

所以你有多個線程(生產者)從UDP讀取數據並將這些項目存儲在共享隊列中。將其稱爲RawData隊列。多個消費者從該隊列中讀取,處理項目並將結果放入另一個共享隊列中。將其稱爲ProcessedData隊列。最後,您有一個線程讀取ProcessedData隊列並將項目存儲在數據庫中。

.NET BlockingCollection是完美的。

這可能會有所幫助:Question on C# threading with RFID

+0

感謝Jim的回覆。是的,我被分成了哪種類型的隊列最適合我的需求。我正在考慮使用循環隊列或雙緩衝區,以最大限度地減少鎖定時間。目前我知道我們沒有那麼多數據需要處理,但我總是在考慮可擴展性。我討厭回去重寫應用程序,當流量增加。 – Dimitri 2011-04-11 21:08:18

+0

我有一個BlockingCollection有點問題。儘管它實現了ConcurrentQueue作爲它的集合基礎,但我注意到,當我爲消費者添加多個線程時,它們不保證從隊列中順序獲取項目。我嘗試了Parallel.Foreach和Parallel.For。他們似乎都不尊重項目的順序。然後我嘗試了Task.StartNew,這似乎是爲了紀念這個序列。另外,我需要將消費者的輸出存儲到另一個隊列中,需要訂購。我可以使用相同的BlockingCollection並運行.OrderBy嗎?或id更好使用SortedList和手動處理鎖定? – Dimitri 2011-04-12 12:53:27

+0

@Dimitri:你觸及很多問題。這可能是最好的,如果你把它作爲另一個問題。 – 2011-04-12 14:38:34