這是我的情景:如何將寫入流1的內容流式傳輸到流2?
producer.WriteStream(stream);
consumer.ReadStream(stream);
我想要的東西,允許由producer
生成的字節將逐步轉移到consumer
。
我可以寫一切到MemoryStream
,然後倒帶它並在consumer
上讀取它,但這會導致巨大的內存消耗。
我該如何做到這一點?
這是我的情景:如何將寫入流1的內容流式傳輸到流2?
producer.WriteStream(stream);
consumer.ReadStream(stream);
我想要的東西,允許由producer
生成的字節將逐步轉移到consumer
。
我可以寫一切到MemoryStream
,然後倒帶它並在consumer
上讀取它,但這會導致巨大的內存消耗。
我該如何做到這一點?
使用管道作爲數據的底層傳輸,可以有一個「寫入流」(服務器)和一個允許這種通信機制的「讀取流」(客戶端)。
使用匿名管道或命名管道(如果需要進程間通信)很簡單。要創建管道流:
AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());
現在,您可以用這些來寫&讀:
producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);
這比我的解決方案容易得多。 –
工程就像一個魅力 –
我只是把這個共同的樂趣,這是未經測試,可能有一些錯誤。您只需將ReaderStream
傳遞給讀者,並將WriterStream
傳遞給作者。的[PipeStream]
public class LoopbackStream
{
public Stream ReaderStream { get; }
public Stream WriterStream { get;}
private readonly BlockingCollection<byte[]> _buffer;
public LoopbackStream()
{
_buffer = new BlockingCollection<byte[]>();
ReaderStream = new ReaderStreamInternal(_buffer);
WriterStream = new WriterStreamInternal(_buffer);
}
private class WriterStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
public WriterStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = false;
CanWrite = false;
CanSeek = false;
}
public override void Close()
{
_buffer.CompleteAdding();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
var newData = new byte[count];
Array.Copy(buffer, offset, newData, 0, count);
_buffer.Add(newData);
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
private class ReaderStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
private readonly IEnumerator<byte[]> _readerEnumerator;
private byte[] _currentBuffer;
private int _currentBufferIndex = 0;
public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = true;
CanWrite = false;
CanSeek = false;
_readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_readerEnumerator.Dispose();
}
base.Dispose(disposing);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_currentBuffer == null)
{
bool read = _readerEnumerator.MoveNext();
if (!read)
return 0;
_currentBuffer = _readerEnumerator.Current;
}
var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
var readBytes = Math.Min(remainingBytes, count);
Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
_currentBufferIndex += readBytes;
if (_currentBufferIndex == _currentBuffer.Length)
_currentBuffer = null;
return readBytes;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
}
使用2實例(https://msdn.microsoft.com/en-us/library/system.io.pipes.pipestream(V = vs.110)的.aspx),1至讀(客戶端)和1寫(服務器)。 – Amit
謝謝@Amit,你能否詳細說明如何將這些流「綁定」在一起..這對我來說並不清楚。 –
如果您需要將數據從一個數據流傳輸到另一個數據流,通常通過從數據源讀取數據塊(例如1K或4K)並將數據放入目標,直到源數據流爲空。 – Oliver