你可以寫一個自定義的Stream
實現其功能是作爲一個管道。如果您隨後將GetItemsFromTable()
方法調用轉換爲後臺任務,則客戶端可以立即開始從流中讀取結果。
在我的解決方案中,我使用循環緩衝區作爲管道流的後備存儲。只有客戶端足夠快地使用數據時,內存使用率纔會降低。但即使在最壞的情況下,它也不應該使用更多的內存,而是使用當前的解決方案。如果內存使用對您來說比執行速度更重要,那麼您的流可能會阻止寫入調用,直到有空間可用。我的解決方案如下不是塊寫入;它擴展了循環緩衝區的容量,以便後臺線程可以繼續無延遲地填充數據。
的GetResults方法可能是這樣的:
public Stream GetResults()
{
// Begin filling the pipe with data on a background thread
var pipeStream = new CircularBufferPipeStream();
Task.Run(() => WriteResults(pipeStream));
// Return pipe stream for immediate usage by client
// Note: client is responsible for disposing of the stream after reading all data!
return pipeStream;
}
// Runs on background thread, filling circular buffer with data
void WriteResults(CircularBufferPipeStream stream)
{
IFormatter formatter = new BinaryFormatter();
formatter.Serialize(stream, GetItemsFromTable1());
formatter.Serialize(stream, GetItemsFromTable2());
formatter.Serialize(stream, GetItemsFromTable3());
formatter.Serialize(stream, GetItemsFromTable4());
// Indicate that there's no more data to write
stream.CloseWritePort();
}
和圓形緩衝流:
/// <summary>
/// Stream that acts as a pipe by supporting reading and writing simultaneously from different threads.
/// Read calls will block until data is available or the CloseWritePort() method has been called.
/// Read calls consume bytes in the circular buffer immediately so that more space is available for writes into the circular buffer.
/// Writes do not block; the capacity of the circular buffer will be expanded as needed to write the entire block of data at once.
/// </summary>
class CircularBufferPipeStream : Stream
{
const int DefaultCapacity = 1024;
byte[] _buffer;
bool _writePortClosed = false;
object _readWriteSyncRoot = new object();
int _length;
ManualResetEvent _dataAddedEvent;
int _start = 0;
public CircularBufferPipeStream(int initialCapacity = DefaultCapacity)
{
_buffer = new byte[initialCapacity];
_length = 0;
_dataAddedEvent = new ManualResetEvent(false);
}
public void CloseWritePort()
{
lock (_readWriteSyncRoot)
{
_writePortClosed = true;
_dataAddedEvent.Set();
}
}
public override bool CanRead { get { return true; } }
public override bool CanWrite { get { return true; } }
public override bool CanSeek { get { return false; } }
public override void Flush() { }
public override long Length { get { throw new NotImplementedException(); } }
public override long Position
{
get { throw new NotImplementedException(); }
set { throw new NotImplementedException(); }
}
public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
public override void SetLength(long value) { throw new NotImplementedException(); }
public override int Read(byte[] buffer, int offset, int count)
{
int bytesRead = 0;
while (bytesRead == 0)
{
bool waitForData = false;
lock (_readWriteSyncRoot)
{
if (_length != 0)
bytesRead = ReadDirect(buffer, offset, count);
else if (_writePortClosed)
break;
else
{
_dataAddedEvent.Reset();
waitForData = true;
}
}
if (waitForData)
_dataAddedEvent.WaitOne();
}
return bytesRead;
}
private int ReadDirect(byte[] buffer, int offset, int count)
{
int readTailCount = Math.Min(Math.Min(_buffer.Length - _start, count), _length);
Array.Copy(_buffer, _start, buffer, offset, readTailCount);
_start += readTailCount;
_length -= readTailCount;
if (_start == _buffer.Length)
_start = 0;
int readHeadCount = Math.Min(Math.Min(_buffer.Length - _start, count - readTailCount), _length);
if (readHeadCount > 0)
{
Array.Copy(_buffer, _start, buffer, offset + readTailCount, readHeadCount);
_start += readHeadCount;
_length -= readHeadCount;
}
return readTailCount + readHeadCount;
}
public override void Write(byte[] buffer, int offset, int count)
{
lock (_readWriteSyncRoot)
{
// expand capacity as needed
if (count + _length > _buffer.Length)
{
var expandedBuffer = new byte[Math.Max(_buffer.Length * 2, count + _length)];
_length = ReadDirect(expandedBuffer, 0, _length);
_start = 0;
_buffer = expandedBuffer;
}
int startWrite = (_start + _length) % _buffer.Length;
int writeTailCount = Math.Min(_buffer.Length - startWrite, count);
Array.Copy(buffer, offset, _buffer, startWrite, writeTailCount);
startWrite += writeTailCount;
_length += writeTailCount;
if (startWrite == _buffer.Length)
startWrite = 0;
int writeHeadCount = count - writeTailCount;
if (writeHeadCount > 0)
{
Array.Copy(buffer, offset + writeTailCount, _buffer, startWrite, writeHeadCount);
_length += writeHeadCount;
}
}
_dataAddedEvent.Set();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_dataAddedEvent != null)
{
_dataAddedEvent.Dispose();
_dataAddedEvent = null;
}
}
base.Dispose(disposing);
}
}
',使其開始stream'所以* *什麼開始進入流?您是否要求消費者如何開始處理Table1,而代碼在表2,3和4上工作? – Plutonix
您現在如何實際處理和轉發結果?也許一個[yield return](https://msdn.microsoft.com/de-de/library/9k7k7cf0.aspx)不僅僅是一個流爲你的目的服務? – Marwie
你只是想將數據庫中的所有結果集傳遞給流後面的客戶端(不需要進行大量轉換),還是需要序列化完整的.net對象?什麼樣的數據庫?任何?什麼樣的客戶? –