2009-08-20 53 views
1

進程間通信的一種方式是通過進程之間的(命名)管道。通過TextWriter進行通信 - > TextReader

我想實現兩個線程之間相同的「隊列」式通信。生產者應該編寫一個文本基本命令(使用TextWriter或輸出流)。 消費者應該閱讀TextReader。因爲,那麼您考慮一下,OutputStream/-Writer是InputStream/-Reader硬幣的另一面。因此,使用Writer來爲讀者填充數據,理論上應該很容易。

(這裏的標準方法是在線程之間有一個Queue,但是我希望使用TextReader和TextWriter,因爲我已經有了用於前端和後端的代碼,這樣就很容易通過連接控制檯在/ Console.Out到生產者/消費者。)

我認爲這將是一個真正簡單的連接一個作家與讀者,但我不知道如何做到這一點。

我可以寫一個這樣的連接我自己,但感覺它「應該」已經在那裏。

任何想法?

乾杯 雷夫

+0

你可以創建一個線程安全隊列,可以在添加和從隊列中刪除火災事件,並放入隊列在一個線程,並採取其他,有很多方法可以做你想做的,所以請解釋一下您的要求更詳細 – 2009-08-20 15:57:39

回答

1

我放棄了尋找「現成」的解決方案。我寫了我自己的。 在Write-end接收數據的新類ThroughputStream中,通過線程安全隊列將它們發佈到使用接收的數據塊讀取的讀端。

namespace My.IO 
{ 
    public class ThrouputStream 
    { 
     private InputStreamClass inputStream; 
     private OutputStreamClass outputStream; 

     private Queue<byte[]> queue = new Queue<byte[]>(); 
     private System.Threading.EventWaitHandle queueEvent = new System.Threading.EventWaitHandle(false, System.Threading.EventResetMode.AutoReset); 

     public ThrouputStream() 
     { 
      inputStream = new InputStreamClass(this); 
      outputStream = new OutputStreamClass(this); 
     } 

     public Stream InputStream 
     { 
      get { return inputStream; } 
     } 

     public Stream OutputStream 
     { 
      get { return outputStream; } 
     } 

     private class InputStreamClass : Stream 
     { 
      private readonly Queue<byte[]> queue; 
      private readonly ThrouputStream parent; 
      private byte[] currentBlock = null; 
      private int currentBlockPos = 0; 
      private Boolean closed = false; 
      private int readTimeoutMs = System.Threading.Timeout.Infinite; 

      public InputStreamClass(ThrouputStream parent) 
      { 

       this.parent = parent; 
       this.queue = parent.queue; 
      } 

      public override bool CanRead 
      { 
       get { return true; } 
      } 

      public override bool CanSeek 
      { 
       get { return false; } 
      } 

      public override bool CanWrite 
      { 
       get { return false; } 
      } 

      public override void Flush() 
      { 
       // Do nothing, always flushes. 
      } 

      public override long Length 
      { 
       get { throw new NotSupportedException(); } 
      } 

      public override long Position 
      { 
       get 
       { 
        throw new NotSupportedException(); 
       } 
       set 
       { 
        throw new NotSupportedException(); 
       } 
      } 

      public override bool CanTimeout 
      { 
       get 
       { 
        return true; 
       } 
      } 

      public override int ReadTimeout 
      { 
       get 
       { 
        return readTimeoutMs; 
       } 
       set 
       { 
        readTimeoutMs = value; 
       } 
      } 

      public override int Read(byte[] buffer, int offset, int count) 
      { 
       if (currentBlock == null) 
       { 
        int queueCount; 
        lock (queue) 
        { 
         queueCount = queue.Count; 
         if (queueCount > 0) 
          currentBlock = queue.Dequeue(); 
        } 

        if (currentBlock == null && !parent.outputStream.IsClosed) 
        { 
         parent.queueEvent.WaitOne(readTimeoutMs); 

         lock (queue) 
         { 
          if (queue.Count == 0) 
           return 0; 

          currentBlock = queue.Dequeue(); 
         } 
        } 

        currentBlockPos = 0; 
       } 

       if (currentBlock == null) 
        return 0; 

       int read = Math.Min(count, currentBlock.Length - currentBlockPos); 
       Array.Copy(currentBlock, currentBlockPos, buffer, offset, read); 
       currentBlockPos += read; 
       if (currentBlockPos == currentBlock.Length) 
       { 
        // did read whole block 
        currentBlockPos = 0; 
        currentBlock = null; 
       } 

       return read; 
      } 

      public override long Seek(long offset, SeekOrigin origin) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void SetLength(long value) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void Write(byte[] buffer, int offset, int count) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void Close() 
      { 
       this.closed = true; 
       base.Close(); 
      } 
     } 

     private class OutputStreamClass : Stream 
     { 
      private bool isClosed = false; 

      private readonly Queue<byte[]> queue; 
      private readonly ThrouputStream parent; 

      public OutputStreamClass(ThrouputStream parent) 
      { 
       this.parent = parent; 
       this.queue = parent.queue; 
      } 

      public override bool CanRead 
      { 
       get { return false; } 
      } 

      public override bool CanSeek 
      { 
       get { return false; } 
      } 

      public override bool CanWrite 
      { 
       get { return true; } 
      } 

      public override void Flush() 
      { 
       // always flush 
      } 

      public override long Length 
      { 
       get { throw new NotSupportedException(); } 
      } 

      public override long Position 
      { 
       get 
       { 
        throw new NotSupportedException(); 
       } 
       set 
       { 
        throw new NotSupportedException(); 
       } 
      } 

      public override int Read(byte[] buffer, int offset, int count) 
      { 
       throw new NotSupportedException(); 
      } 

      public override long Seek(long offset, SeekOrigin origin) 
      { 
       throw new NotSupportedException(); 
      } 

      public override void SetLength(long value) 
      { 
       throw new NotSupportedException(); 
      } 

      public override void Write(byte[] buffer, int offset, int count) 
      { 
       byte[] copy = new byte[count]; 
       Array.Copy(buffer, offset, copy, 0, count); 
       lock (queue) 
       { 
        queue.Enqueue(copy); 
        try 
        { 
         parent.queueEvent.Set(); 
        } 
        catch (Exception) 
        { } 
       } 
      } 

      public override void Close() 
      { 
       this.isClosed = true; 
       base.Close(); 

       // Signal event, to stop waiting consumer 
       try 
       { 
        parent.queueEvent.Set(); 
       } 
       catch (Exception) 
       { } 
      } 

      public bool IsClosed 
      { 
       get { return isClosed; } 
      } 
     } 

    } 
} 
2

我會阻止使用流和TextWriter/TextReader作爲線程間通信的有效手段。您需要爲每個「隊列」設置一個流,並且爲了確保有效數據完全寫入或讀取,您需要爲每個寫入或讀取操作鎖定該流。更好的解決方案可能是這樣的:

設置一個字符串類型的隊列,以及一對ManualResetEvents。總的想法是使用線程信號來允許兩個線程進行通信而不需要鎖定。

public static class ThreadTest 
{ 
    public void Main() 
    { 
     long exit = 0; 

     Queue<string> messages = new Queue<string>(); 
     ManualResetEvent signal1 = new ManualResetEvent(); 
     ManualResetEvent signal2 = new ManualResetEvent(); 

     signal2.Set(); 

     Thread writer = new Thread(() => 
     { 
      while (exit == 0) 
      { 
       string value = Console.ReadLine(); 
       if (value == "exit") 
       { 
        Interlocked.Exchange(ref exit, 1); 
       } 
       else 
       { 
        messages.Enqueue(value); 
        Console.WriteLine("Written: " + value); 
        signal1.Set(); 
       } 

       signal2.WaitOne(); 
      } 
     }); 

     Thread reader = new Thread(() => 
     { 
      while (exit == 0) 
      { 
       signal1.WaitOne(); 
       signal2.Reset(); 

       value = messages.Dequeue(); 
       Console.WriteLine("Read: " + value); 

       signal2.Set(); 
       signal1.Reset(); 
      } 
     }); 

     reader.Start(); 
     writer.Start(); 
    } 
} 
+0

我同意全部! 使用流在兩個線程之間進行通信通常是一個糟糕的主意。 對於我的實驗,我有使用它來「單元測試」現有代碼的內涵,它是控制檯程序的基礎。 – leiflundgren 2009-08-25 08:45:48