我放棄了尋找「現成」的解決方案。我寫了我自己的。 在Write-end接收數據的新類ThroughputStream中,通過線程安全隊列將它們發佈到使用接收的數據塊讀取的讀端。
namespace My.IO
{
public class ThrouputStream
{
private InputStreamClass inputStream;
private OutputStreamClass outputStream;
private Queue<byte[]> queue = new Queue<byte[]>();
private System.Threading.EventWaitHandle queueEvent = new System.Threading.EventWaitHandle(false, System.Threading.EventResetMode.AutoReset);
public ThrouputStream()
{
inputStream = new InputStreamClass(this);
outputStream = new OutputStreamClass(this);
}
public Stream InputStream
{
get { return inputStream; }
}
public Stream OutputStream
{
get { return outputStream; }
}
private class InputStreamClass : Stream
{
private readonly Queue<byte[]> queue;
private readonly ThrouputStream parent;
private byte[] currentBlock = null;
private int currentBlockPos = 0;
private Boolean closed = false;
private int readTimeoutMs = System.Threading.Timeout.Infinite;
public InputStreamClass(ThrouputStream parent)
{
this.parent = parent;
this.queue = parent.queue;
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
// Do nothing, always flushes.
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override bool CanTimeout
{
get
{
return true;
}
}
public override int ReadTimeout
{
get
{
return readTimeoutMs;
}
set
{
readTimeoutMs = value;
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (currentBlock == null)
{
int queueCount;
lock (queue)
{
queueCount = queue.Count;
if (queueCount > 0)
currentBlock = queue.Dequeue();
}
if (currentBlock == null && !parent.outputStream.IsClosed)
{
parent.queueEvent.WaitOne(readTimeoutMs);
lock (queue)
{
if (queue.Count == 0)
return 0;
currentBlock = queue.Dequeue();
}
}
currentBlockPos = 0;
}
if (currentBlock == null)
return 0;
int read = Math.Min(count, currentBlock.Length - currentBlockPos);
Array.Copy(currentBlock, currentBlockPos, buffer, offset, read);
currentBlockPos += read;
if (currentBlockPos == currentBlock.Length)
{
// did read whole block
currentBlockPos = 0;
currentBlock = null;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override void Close()
{
this.closed = true;
base.Close();
}
}
private class OutputStreamClass : Stream
{
private bool isClosed = false;
private readonly Queue<byte[]> queue;
private readonly ThrouputStream parent;
public OutputStreamClass(ThrouputStream parent)
{
this.parent = parent;
this.queue = parent.queue;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
// always flush
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
byte[] copy = new byte[count];
Array.Copy(buffer, offset, copy, 0, count);
lock (queue)
{
queue.Enqueue(copy);
try
{
parent.queueEvent.Set();
}
catch (Exception)
{ }
}
}
public override void Close()
{
this.isClosed = true;
base.Close();
// Signal event, to stop waiting consumer
try
{
parent.queueEvent.Set();
}
catch (Exception)
{ }
}
public bool IsClosed
{
get { return isClosed; }
}
}
}
}
你可以創建一個線程安全隊列,可以在添加和從隊列中刪除火災事件,並放入隊列在一個線程,並採取其他,有很多方法可以做你想做的,所以請解釋一下您的要求更詳細 – 2009-08-20 15:57:39