2015-06-22 25 views
0

我有一個返回數據庫查詢結果的函數。這些結果變得非常大,我現在想要將它們作爲流傳遞,以便客戶端可以更快地開始處理它們,並且內存使用率更低。但我真的不知道如何做到這一點,下面的函數可以工作,但我想知道如何改變它,以便從第一個表中讀取時開始流式傳輸。將數據庫結果作爲流返回

public Stream GetResults() 
    { 
     IFormatter formatter = new BinaryFormatter(); 
     Stream stream = new MemoryStream(); 

     formatter.Serialize(stream, GetItemsFromTable1()); 
     formatter.Serialize(stream, GetItemsFromTable2()); 
     formatter.Serialize(stream, GetItemsFromTable3()); 
     formatter.Serialize(stream, GetItemsFromTable4()); 

     stream.Position = 0;    
     return stream; 
    } 
+0

',使其開始stream'所以* *什麼開始進入流?您是否要求消費者如何開始處理Table1,而代碼在表2,3和4上工作? – Plutonix

+0

您現在如何實際處理和轉發結果?也許一個[yield return](https://msdn.microsoft.com/de-de/library/9k7k7cf0.aspx)不僅僅是一個流爲你的目的服務? – Marwie

+0

你只是想將數據庫中的所有結果集傳遞給流後面的客戶端(不需要進行大量轉換),還是需要序列化完整的.net對象?什麼樣的數據庫?任何?什麼樣的客戶? –

回答

1

你可以寫一個自定義的Stream實現其功能是作爲一個管道。如果您隨後將GetItemsFromTable()方法調用轉換爲後臺任務,則客戶端可以立即開始從流中讀取結果。

在我的解決方案中,我使用循環緩衝區作爲管道流的後備存儲。只有客戶端足夠快地使用數據時,內存使用率纔會降低。但即使在最壞的情況下,它也不應該使用更多的內存,而是使用當前的解決方案。如果內存使用對您來說比執行速度更重要,那麼您的流可能會阻止寫入調用,直到有空間可用。我的解決方案如下不是塊寫入;它擴展了循環緩衝區的容量,以便後臺線程可以繼續無延遲地填充數據。

的GetResults方法可能是這樣的:

public Stream GetResults() 
{ 
    // Begin filling the pipe with data on a background thread 
    var pipeStream = new CircularBufferPipeStream(); 
    Task.Run(() => WriteResults(pipeStream)); 

    // Return pipe stream for immediate usage by client 
    // Note: client is responsible for disposing of the stream after reading all data! 
    return pipeStream; 
} 

// Runs on background thread, filling circular buffer with data 
void WriteResults(CircularBufferPipeStream stream) 
{ 
    IFormatter formatter = new BinaryFormatter(); 
    formatter.Serialize(stream, GetItemsFromTable1()); 
    formatter.Serialize(stream, GetItemsFromTable2()); 
    formatter.Serialize(stream, GetItemsFromTable3()); 
    formatter.Serialize(stream, GetItemsFromTable4()); 

    // Indicate that there's no more data to write 
    stream.CloseWritePort(); 
} 

和圓形緩衝流:

/// <summary> 
/// Stream that acts as a pipe by supporting reading and writing simultaneously from different threads. 
/// Read calls will block until data is available or the CloseWritePort() method has been called. 
/// Read calls consume bytes in the circular buffer immediately so that more space is available for writes into the circular buffer. 
/// Writes do not block; the capacity of the circular buffer will be expanded as needed to write the entire block of data at once. 
/// </summary> 
class CircularBufferPipeStream : Stream 
{ 
    const int DefaultCapacity = 1024; 
    byte[] _buffer; 
    bool _writePortClosed = false; 
    object _readWriteSyncRoot = new object(); 
    int _length; 
    ManualResetEvent _dataAddedEvent; 
    int _start = 0; 

    public CircularBufferPipeStream(int initialCapacity = DefaultCapacity) 
    { 
     _buffer = new byte[initialCapacity]; 
     _length = 0; 
     _dataAddedEvent = new ManualResetEvent(false); 
    } 

    public void CloseWritePort() 
    { 
     lock (_readWriteSyncRoot) 
     { 
      _writePortClosed = true; 
      _dataAddedEvent.Set(); 
     } 
    } 

    public override bool CanRead { get { return true; } } 
    public override bool CanWrite { get { return true; } } 
    public override bool CanSeek { get { return false; } } 
    public override void Flush() { } 
    public override long Length { get { throw new NotImplementedException(); } } 
    public override long Position 
    { 
     get { throw new NotImplementedException(); } 
     set { throw new NotImplementedException(); } 
    } 
    public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } 
    public override void SetLength(long value) { throw new NotImplementedException(); } 
    public override int Read(byte[] buffer, int offset, int count) 
    { 
     int bytesRead = 0; 
     while (bytesRead == 0) 
     { 
      bool waitForData = false; 
      lock (_readWriteSyncRoot) 
      { 
       if (_length != 0) 
        bytesRead = ReadDirect(buffer, offset, count); 
       else if (_writePortClosed) 
        break; 
       else 
       { 
        _dataAddedEvent.Reset(); 
        waitForData = true; 
       } 
      } 
      if (waitForData) 
       _dataAddedEvent.WaitOne(); 
     } 
     return bytesRead; 
    } 

    private int ReadDirect(byte[] buffer, int offset, int count) 
    { 
     int readTailCount = Math.Min(Math.Min(_buffer.Length - _start, count), _length); 
     Array.Copy(_buffer, _start, buffer, offset, readTailCount); 
     _start += readTailCount; 
     _length -= readTailCount; 
     if (_start == _buffer.Length) 
      _start = 0; 

     int readHeadCount = Math.Min(Math.Min(_buffer.Length - _start, count - readTailCount), _length); 
     if (readHeadCount > 0) 
     { 
      Array.Copy(_buffer, _start, buffer, offset + readTailCount, readHeadCount); 
      _start += readHeadCount; 
      _length -= readHeadCount; 
     } 

     return readTailCount + readHeadCount; 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     lock (_readWriteSyncRoot) 
     { 
      // expand capacity as needed 
      if (count + _length > _buffer.Length) 
      { 
       var expandedBuffer = new byte[Math.Max(_buffer.Length * 2, count + _length)]; 
       _length = ReadDirect(expandedBuffer, 0, _length); 
       _start = 0; 
       _buffer = expandedBuffer; 
      } 

      int startWrite = (_start + _length) % _buffer.Length; 
      int writeTailCount = Math.Min(_buffer.Length - startWrite, count); 
      Array.Copy(buffer, offset, _buffer, startWrite, writeTailCount); 
      startWrite += writeTailCount; 
      _length += writeTailCount; 
      if (startWrite == _buffer.Length) 
       startWrite = 0; 

      int writeHeadCount = count - writeTailCount; 
      if (writeHeadCount > 0) 
      { 
       Array.Copy(buffer, offset + writeTailCount, _buffer, startWrite, writeHeadCount); 
       _length += writeHeadCount; 
      } 
     } 
     _dataAddedEvent.Set(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     if (disposing) 
     { 
      if (_dataAddedEvent != null) 
      { 
       _dataAddedEvent.Dispose(); 
       _dataAddedEvent = null; 
      } 
     } 
     base.Dispose(disposing); 
    } 
} 
+0

感謝這個羅傑。我的GetResults函數幾乎與您的GetResults函數一樣,但無法使用標準MemoryStream。您的自定義循環流應該這樣做。 –

0

嘗試

public Stream GetResults() 
{ 
    IFormatter formatter = new BinaryFormatter(); 
    Stream stream = new MemoryStream(); 

    formatter.Serialize(stream, GetItemsFromTable1()); 
    formatter.Serialize(stream, GetItemsFromTable2()); 
    formatter.Serialize(stream, GetItemsFromTable3()); 
    formatter.Serialize(stream, GetItemsFromTable4()); 

    stream.Seek(0L, SeekOrigin.Begin); 

    return stream; 
} 

爲什麼變化?

  • 刪除使用,因爲您的流一旦離開使用塊就被丟棄。處理流意味着你不能再使用它
  • 尋求流的開始。如果你開始從流中讀取數據而沒有尋找它的開始,你會開始反序列化/讀取它的結尾;但不幸的是沒有內容後端流

但是,我看不到如何使用MemoryStream減少內存使用量。我建議將它鏈接到DeflateStreamFileStream減少RAM使用率

希望這有助於

+0

謝謝 - 你的觀點都在使用,並且流的位置是有效的;但他們不回答我的問題開始閱讀流,而它仍然在寫 –