2015-10-18 44 views
1

首先我應該提及,我沒有內部的Stream對象可用。取而代之的是,我有這個對象:實現自定義流

public interface IChannel 
{ 
    void Send(byte[] data); 
    event EventHandler<byte[]> Receive; 
} 

我想實現一個流類,一個是這樣的:

public class ChannelStream : Stream 
{ 
    private readonly IChannel _channel; 

    public ChannelStream(IChannel channel) 
    { 
     this._channel = channel; 
    } 

    // TODO: Implement Stream class 
} 

我需要的功能非常相似,NetworkStream
寫入字節到我的流應該將這些字節添加到緩衝區並調用_channel.Send一次Flush()被稱爲。
流還將監聽_channel.Receive事件並將字節添加到另一個內部緩衝區,直到從流中讀取它們。如果流沒有可用的數據,它應該阻塞直到新數據可用。

但是,我正在努力實施。我在內部使用兩個MemoryStream進行了實驗,但這導致緩衝區繼續吃越來越多的內存。

我可以用什麼樣的收集/流來實現我的流?

+1

'BlockingCollection'可能是一個有用的技巧。 – SimpleVar

回答

2

考慮你需要什麼從收集和去那裏。

下面是當你需要某種形式的集合,你應該考慮幾個問題:

  1. 你需要在集合中的項隨機訪問?

  2. 集合是否將被多個線程訪問?

  3. 讀取後需要保留集合中的數據嗎?

  4. 排序重要嗎?如果是這樣,什麼順序 - 添加訂單,反向添加訂單,通過一些比較項目排序?

對於在這種情況下,答案是否定的,是的,沒有,是的輸出緩衝器:添加順序。這幾乎挑出了ConcurrentQueue類。這允許您添加一個或多個源代碼中的對象,這些對象不需要與正在讀取它們的代碼位於相同的線程中。它不會讓你任意索引收集(好吧,不是直接),你不需要。

我會使用相同類型的輸入緩衝區,以「當前塊」緩衝區來保存最近讀緩衝區,包裹在一些簡單的對象鎖定機制,以處理任何線程的問題。

輸出部分看起來是這樣的:

// Output buffer 
private readonly ConcurrentQueue<byte[]> _outputBuffer = new ConcurrentQueue<byte[]>(); 

public override void Write(byte[] buffer, int offset, int count) 
{ 
    // Copy written data to new buffer and add to output queue 
    byte[] data = new byte[count]; 
    Buffer.BlockCopy(buffer, offset, data, 0, count); 
    _outputBuffer.Enqueue(data); 
} 

public override void Flush() 
{ 
    // pull everything out of the queue and send to wherever it is going 
    byte[] curr; 
    while (_outputBuffer.TryDequeue(out curr)) 
     internalSendData(curr); 
} 

internalSendData方法是數據會再出去到網絡。

讀緩衝是稍微複雜一些:

// collection to hold unread input data 
private readonly ConcurrentQueue<byte[]> _inputBuffer = new ConcurrentQueue<byte[]>(); 
// current data block being read from 
private byte[] _inputCurrent = null; 
// read offset in current block 
private short _inputPos = 0; 
// object for locking access to the above. 
private readonly object _inputLock = new object(); 

public override int Read(byte[] buffer, int offset, int count) 
{ 
    int readCount = 0; 
    lock(_inputLock) 
    { 
     while (count > 0) 
     { 
      if (_inputCurrent == null || _inputCurrent.Length <= _inputPos) 
      { 
       // read next block from input buffer 
       if (!_inputBuffer.TryDequeue(out _inputCurrent)) 
        break; 

       _inputPos = 0; 
      } 

      // copy bytes to destination 
      int nBytes = Math.Min(count, _inputCurrent.Length - _inputPos); 
      Buffer.BlockCopy(_inputCurrent, _inputPos, buffer, offset, nBytes); 

      // adjust all the offsets and counters 
      readCount += nBytes; 
      offset += nBytes; 
      count -= nBytes; 
      _inputPos += (short)nBytes; 
     } 
    } 
    return readCount; 
} 

希望這是有道理的。

使用這種軟緩衝的隊列意味着數據只在內存中保存,只要它們被延遲發送或讀取。一旦你調用Flush輸出緩衝區的內存被釋放以進行垃圾收集,所以你不必擔心內存爆炸,除非你試圖發送比實際傳輸機制更快的處理速度。但是,如果你每秒鐘通過ADSL連接排隊數兆字節的數據,沒有什麼可以爲你節省:P

我想在上面添加一些改進,就像一些檢查以確保一旦緩衝區處於合理的水平,即可自動調用Flush