2012-07-01 17 views
0

我正在創建一個控制檯應用程序,它應該接收來自網絡的消息以便處理它們。首先,我創建了一個單例類,以確保所有類都可以訪問同一隊列:該類爲ProcessingQueue爲什麼這些輔助線程不會結束它們的執行?

public class ProcessingQueue 
{ 
    public class ItemToProcess 
    { 
     public string SourceClientId { get; set; } 
     public IMessage ReceivedMessage { get; set; } 
    } 

    private int m_MaxSize = 20; 
    private Queue<ItemToProcess> m_InternalQueue; 

    private static volatile ProcessingQueue m_Instance = null; 
    private static readonly object syncRoot = new object(); 

    private ProcessingQueue() 
    { 
     m_InternalQueue = new Queue<ItemToProcess>(); 
    } 

    public static ProcessingQueue Instance 
    { 
     get 
     { 
      if (m_Instance == null) 
      { 
       lock (syncRoot) 
       { 
        if (m_Instance == null) 
        { 
         m_Instance = new ProcessingQueue(); 
        } 
       } 
      } 
      return m_Instance; 
     } 
    } 

    public int MaxSize 
    { 
     get 
     { 
      lock (syncRoot) 
      { 
       return m_MaxSize; 
      } 
     } 
     set 
     { 
      if (value > 0) 
      { 
       lock (syncRoot) 
       { 
        m_MaxSize = value; 
       } 
      } 
     } 
    } 

    public void Enqueue(String source, IMessage message) 
    { 
     lock (syncRoot) 
     { 
      while (m_InternalQueue.Count >= m_MaxSize) 
      { 
       Monitor.Wait(syncRoot); 
      } 
      m_InternalQueue.Enqueue(new ItemToProcess { SourceClientId = source, ReceivedMessage = message }); 
      if (m_InternalQueue.Count == 1) 
      { 
       Monitor.PulseAll(syncRoot); 
      } 
     } 
    } 

    public ItemToProcess Dequeue() 
    { 
     lock (syncRoot) 
     { 
      while (m_InternalQueue.Count == 0) 
      { 
       Monitor.Wait(syncRoot); 
      } 
      ItemToProcess item = m_InternalQueue.Dequeue(); 
      if (m_InternalQueue.Count == m_MaxSize - 1) 
      { 
       Monitor.PulseAll(syncRoot); 
      } 
      return item; 
     } 
    } 

    public int Count 
    { 
     get 
     { 
      lock (syncRoot) 
      { 
       return m_InternalQueue.Count; 
      } 
     } 
    } 
} 

然後我實現瞭如下項目的主要類。

  1. 首先,共享隊列被實例化。
  2. 然後,我設置了一個計時器來模擬保持活動消息(第一個生產者)的到來。
  3. 然後我創建了消費者線程(processing對象)。
  4. 然後我創建了另一個生產者線程(generating對象)。
  5. 最後,我運行了所有的線程和計時器。

    class程序 靜態ProcessingQueue隊列= ProcessingQueue.Instance; static System.Timers.Timer keep_alive_timer = new System.Timers.Timer(10000);

    private static volatile bool running = true; 
    
    
    static void Main(string[] args) 
    { 
        queue.MaxSize = 30; 
        keep_alive_timer.Elapsed += new ElapsedEventHandler(delegate(object sender, ElapsedEventArgs e) 
        { 
         KeepAliveMessage msg = new KeepAliveMessage(Guid.NewGuid()); 
         Console.WriteLine("Keep Alive: " + msg.MsgId); 
         queue.Enqueue("", msg); 
        }); 
        keep_alive_timer.Enabled = true; 
        keep_alive_timer.AutoReset = true; 
    
        Thread processing = new Thread(delegate() 
        { 
         while (running) 
         { 
          Console.WriteLine("Number of elements in queue: {0}", queue.Count); 
    
          ProcessingQueue.ItemToProcess msg = queue.Dequeue(); 
          Console.WriteLine("Processed: msgid={0}, source={1};", msg.ReceivedMessage.MsgId, msg.SourceClientId); 
    
          Thread.Sleep(1500); 
         } 
        }); 
    
        Thread generating = new Thread(MessagesFromNetwork); 
    
        processing.Start(); 
        keep_alive_timer.Start(); 
        generating.Start(); 
    
        Console.WriteLine("RUNNING...\n"); 
        Console.ReadLine(); 
    
        running = false; 
        keep_alive_timer.Stop(); 
        Console.WriteLine("CLOSING...\n"); 
    
        //processing.Abort(); 
        //generating.Abort(); 
    
        bool b1 = processing.Join(TimeSpan.FromSeconds(5)); 
        bool b2 = generating.Join(TimeSpan.FromSeconds(5)); 
    
        Console.WriteLine("b1 {0}", b1); 
        Console.WriteLine("b2 {0}", b2); 
        Console.WriteLine("END"); 
        Console.ReadLine(); 
    } 
    
    static void MessagesFromNetwork() 
    { 
        string[] sourceClients = { "1", "2", "3", "4", "5" }; 
        while (running) 
        { 
         IMessage msg; // interface IMessage 
         Random random = new Random(); 
         int type = random.Next(2); 
         switch (type) 
         { 
          case 0: 
           msg = new KeepAliveMessage(Guid.NewGuid()); // implements IMessage 
           break; 
          case 1: 
           msg = new TaskMessage(Guid.NewGuid(), ...); // implements IMessage 
           break; 
          default: 
           throw new Exception("Messaggio non valido!"); 
         } 
         Console.WriteLine("New message received: " + msg.MsgId); 
         queue.Enqueue(sourceClients[random.Next(sourceClients.Length)], msg); 
         Console.WriteLine("... message enqueued: " + msg.MsgId); 
         Thread.Sleep(500); 
        } 
    } 
    

    }

執行過程中輸入,在running變量爲假並且兩個線程應該終止。然而這並不總是會發生,事實上Join這兩種方法中的一種沒有返回控制權:由於這個原因,我在Join方法中指定了一個超時,但在Console.WriteLine("END");之後,控制檯應用程序凍結(第二個Join返回false)。

也許第二個線程沒有正確終止......爲什麼?

+0

用'懶惰'實現單身人士更容易,但我一直認爲辛格爾頓是模式中最殘酷的。 http://stackoverflow.com/a/1020384/14357 – spender

+0

好的,但我需要一個可以訪問WCF服務實例的隊列:在我的代碼中,我模擬來自網絡的數據,但實際上WCF服務的許多實例必須把數據在隊列中。 – enzom83

回答

1

看起來像Dequeue或Enqueue可以進入Monitor.Wait(),當運行停止沒有人脈衝。

您等待5秒鐘,但要注意,最大範圍* 1500> 5000

我不能這麼直接找出定時器的頻率。

+0

我可以改變'Join'方法的等待時間嗎?或者我應該使用一個更簡單的方法,而不是'Monitor'? – enzom83

+0

你的Enqueue/Dequeue方法根本沒有停止邏輯...只需使用一個庫類。 –

+0

圖書館課是什麼意思?我應該使用「ConcurrentQueue」類嗎? – enzom83

相關問題