我正在創建一個控制檯應用程序,它應該接收來自網絡的消息以便處理它們。首先,我創建了一個單例類,以確保所有類都可以訪問同一隊列:該類爲ProcessingQueue
。爲什麼這些輔助線程不會結束它們的執行?
public class ProcessingQueue
{
public class ItemToProcess
{
public string SourceClientId { get; set; }
public IMessage ReceivedMessage { get; set; }
}
private int m_MaxSize = 20;
private Queue<ItemToProcess> m_InternalQueue;
private static volatile ProcessingQueue m_Instance = null;
private static readonly object syncRoot = new object();
private ProcessingQueue()
{
m_InternalQueue = new Queue<ItemToProcess>();
}
public static ProcessingQueue Instance
{
get
{
if (m_Instance == null)
{
lock (syncRoot)
{
if (m_Instance == null)
{
m_Instance = new ProcessingQueue();
}
}
}
return m_Instance;
}
}
public int MaxSize
{
get
{
lock (syncRoot)
{
return m_MaxSize;
}
}
set
{
if (value > 0)
{
lock (syncRoot)
{
m_MaxSize = value;
}
}
}
}
public void Enqueue(String source, IMessage message)
{
lock (syncRoot)
{
while (m_InternalQueue.Count >= m_MaxSize)
{
Monitor.Wait(syncRoot);
}
m_InternalQueue.Enqueue(new ItemToProcess { SourceClientId = source, ReceivedMessage = message });
if (m_InternalQueue.Count == 1)
{
Monitor.PulseAll(syncRoot);
}
}
}
public ItemToProcess Dequeue()
{
lock (syncRoot)
{
while (m_InternalQueue.Count == 0)
{
Monitor.Wait(syncRoot);
}
ItemToProcess item = m_InternalQueue.Dequeue();
if (m_InternalQueue.Count == m_MaxSize - 1)
{
Monitor.PulseAll(syncRoot);
}
return item;
}
}
public int Count
{
get
{
lock (syncRoot)
{
return m_InternalQueue.Count;
}
}
}
}
然後我實現瞭如下項目的主要類。
- 首先,共享隊列被實例化。
- 然後,我設置了一個計時器來模擬保持活動消息(第一個生產者)的到來。
- 然後我創建了消費者線程(
processing
對象)。 - 然後我創建了另一個生產者線程(
generating
對象)。 最後,我運行了所有的線程和計時器。
class程序 靜態ProcessingQueue隊列= ProcessingQueue.Instance; static System.Timers.Timer keep_alive_timer = new System.Timers.Timer(10000);
private static volatile bool running = true; static void Main(string[] args) { queue.MaxSize = 30; keep_alive_timer.Elapsed += new ElapsedEventHandler(delegate(object sender, ElapsedEventArgs e) { KeepAliveMessage msg = new KeepAliveMessage(Guid.NewGuid()); Console.WriteLine("Keep Alive: " + msg.MsgId); queue.Enqueue("", msg); }); keep_alive_timer.Enabled = true; keep_alive_timer.AutoReset = true; Thread processing = new Thread(delegate() { while (running) { Console.WriteLine("Number of elements in queue: {0}", queue.Count); ProcessingQueue.ItemToProcess msg = queue.Dequeue(); Console.WriteLine("Processed: msgid={0}, source={1};", msg.ReceivedMessage.MsgId, msg.SourceClientId); Thread.Sleep(1500); } }); Thread generating = new Thread(MessagesFromNetwork); processing.Start(); keep_alive_timer.Start(); generating.Start(); Console.WriteLine("RUNNING...\n"); Console.ReadLine(); running = false; keep_alive_timer.Stop(); Console.WriteLine("CLOSING...\n"); //processing.Abort(); //generating.Abort(); bool b1 = processing.Join(TimeSpan.FromSeconds(5)); bool b2 = generating.Join(TimeSpan.FromSeconds(5)); Console.WriteLine("b1 {0}", b1); Console.WriteLine("b2 {0}", b2); Console.WriteLine("END"); Console.ReadLine(); } static void MessagesFromNetwork() { string[] sourceClients = { "1", "2", "3", "4", "5" }; while (running) { IMessage msg; // interface IMessage Random random = new Random(); int type = random.Next(2); switch (type) { case 0: msg = new KeepAliveMessage(Guid.NewGuid()); // implements IMessage break; case 1: msg = new TaskMessage(Guid.NewGuid(), ...); // implements IMessage break; default: throw new Exception("Messaggio non valido!"); } Console.WriteLine("New message received: " + msg.MsgId); queue.Enqueue(sourceClients[random.Next(sourceClients.Length)], msg); Console.WriteLine("... message enqueued: " + msg.MsgId); Thread.Sleep(500); } }
}
按執行過程中輸入,在running
變量爲假並且兩個線程應該終止。然而這並不總是會發生,事實上Join
這兩種方法中的一種沒有返回控制權:由於這個原因,我在Join
方法中指定了一個超時,但在Console.WriteLine("END");
之後,控制檯應用程序凍結(第二個Join
返回false
)。
也許第二個線程沒有正確終止......爲什麼?
用'懶惰'實現單身人士更容易,但我一直認爲辛格爾頓是模式中最殘酷的。 http://stackoverflow.com/a/1020384/14357 –
spender
好的,但我需要一個可以訪問WCF服務實例的隊列:在我的代碼中,我模擬來自網絡的數據,但實際上WCF服務的許多實例必須把數據在隊列中。 – enzom83