2012-03-13 150 views
2

我在編寫c#中的異步多服務器網絡應用程序時遇到了問題。我有很多工作由線程池處理,這些工作包括寫入網絡套接字。這最終允許多個線程可以同時寫入套接字並解散我輸出消息的情況。我的想法是實現一個隊列系統,每當數據添加到隊列中時,套接字都會寫入隊列。異步隊列管理器

我的問題是,我無法將自己的頭圍繞在這種性質的建築中。我想象有一個隊列對象在數據被添加到隊列時觸發一個事件。然後事件寫入隊列中保存的數據,但這不起作用,因爲如果兩個線程同時到達並同時添加到隊列中,即使隊列被設置爲線程安全,事件仍然會被觸發我會遇到同樣的問題。那麼如果另一個事件正在進行,那麼也許還有一個事件可以阻止一個事件,但是如果第一個事件完成後我該如何繼續該事件,而不是簡單地阻塞某個互斥體或某個事件的線程。如果我不是試圖嚴格遵守我的「無阻塞」體系結構,但這個特殊的應用程序要求我允許線程池線程繼續做它們的事情,這並不會很難。

任何想法?

回答

4

儘管與Porges類似,但它在實現方面有所不同。

首先,我通常不會排隊要發送的字節,而是在發送線程中對其進行分區,但我想這是一個有趣的問題。 但是更大的區別在於使用ConcurrentQueues(除了BlockingCollection之外)。 所以我有類似的代碼,最終以

 BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>()); 
     while (true) 
     { 
      var packet = sendQueue.Take(); //this blocks if there are no items in the queue. 
      SendPacket(packet); //Send your packet here. 
     } 

的關鍵,帶走這裏是你有一個線程,其循環這段代碼,和所有其他的線程可以在一個線程安全的方式添加到隊列(BlockingCollection和ConcurrentQueue都是線程安全的)

看看Processing a queue of items asynchronously in C#我在那裏回答了類似的問題。

+2

'new BlockingCollection (new ConcurrentQueue ())''完全等價於'new BlockingCollection ()':) – porges 2012-03-13 10:08:23

+0

很好理解,但我通常對我的初始值設定項非常詳細。當有人不知道什麼是默認值時,它變得更清晰:)(和「var」有助於不必重複所有內容) – Brunner 2012-03-19 09:29:00

+0

超級有用,Alex。保存了我的培根。 – 2012-04-04 19:33:12

0

我不知道C#,但我要做的是讓事件觸發套接字管理器開始從隊列中拉出並一次寫出一條消息。如果已經觸發,觸發器將不會執行任何操作,一旦隊列中沒有任何內容,就會停止。

這樣可以解決兩個線程同時寫入隊列的問題,因爲第二個事件將是空操作。

+0

經過一番考慮,我考慮過了。我質疑它的原因是因爲一個不太可能的時機問題。想象一下,在被觸發的事件開始時,它將一些布爾值設置爲true。所以你有一些變量告訴其餘的線程,一些隊列數據處理器正在運行。如果在處理線程有時間將值交換回false之前將數據添加到隊列中,會發生什麼,但是處理器已經讀取隊列中沒有剩餘數據。那麼我有隊列中的數據將不會被處理,直到更多的數據被添加。 – Dabloons 2012-03-13 05:25:15

+0

我同意這可能是可能的 - 也許一個額外的檢查隊列中的東西后重置布爾值將涵蓋該問題。 – JoshRagem 2012-03-13 05:31:46

0

您可以擁有一個線程安全的隊列,您的所有工作線程都會將其結果寫入其中。然後讓另一個線程輪詢隊列,並在看到它們等待時發送結果。

4

聽起來就像你需要一個線程同步寫入套接字,一堆線程寫入隊列以供該線程處理。

您可以使用阻塞集合(BlockingCollection<T>)做艱苦的工作:

// somewhere there is a queue: 

BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>(); 

// in socket-writing thread, read from the queue and send the messages: 

foreach (byte[] message in queue.GetConsumingEnumerable()) 
{ 
    // just an example... obviously you'd need error handling and stuff here 
    socket.Send(message); 
} 

// in the other threads, just enqueue messages to be sent: 

queue.Add(someMessage); 

的BlockingCollection將處理所有的同步。您還可以執行最大隊列長度和其他有趣的事情。