2010-07-08 24 views
3

我有一個WCF服務,可以從遠程FTP服務器傳輸大文件(100MB +)。在返回一個WCF流時流到文件?

[ServiceContract] 
public interface IStreamService 
{ 
    [OperationContract] 
    Stream GetDataFromFtp(); 
} 

public class StreamService : IStreamService 
{ 
    public Stream GetDataFromFtp() 
    { 
     Stream ftpStream = Retr(...); 
     return ftpStream; 
    } 
} 

由於WCF文件流的客戶端,我想它傳輸到本地緩存,以便將來的請求不需要所有的方式回到遠程FTP - 我可以爲他們服務關閉磁盤。

我有問題實現這一點,而不是在返回之前緩衝內存中的整個100MB文件流。

我試圖用一個簡單的捕獲流的包裝,不寫爲每個閱讀:

public class CapturingStreamWrapper : Stream 
{ 
    private readonly Stream stream; 
    private readonly Stream captureStream; 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     int readBytes = stream.Read(buffer, offset, count); 
     captureStream.Write(buffer, offset, readBytes); 

     return readBytes; 
    } 
} 

public class StreamService : IStreamService 
{ 
    public Stream GetDataFromFtp() 
    { 
     Stream ftpStream = Retr(...); 
     Stream cacheStream = File.OpenWrite(...); 
     return new CapturingStreamWrapper(ftpStream, cacheStream); 
    } 
} 

但這似乎並沒有工作。

此外,這不提供任何錯誤處理 - 如果客戶端傳輸失敗(即事務緩存),我需要一個緩存的catch塊來刪除任何半寫入的文件。我不確定這將如何工作,因爲我不知道什麼時候在WCF生命週期中調用/清理流。

任何想法如何在流式傳輸回客戶端時流入文件?

回答

3

我最終編寫了幾個相互關聯的流類 - 一個在讀取時管道傳遞給另一個。對於大的代碼粘貼道歉:

/// <summary> 
/// A stream that, as it reads, makes those bytes available on an ouput 
/// stream. Thread safe. 
/// </summary> 
public class CacheStream : Stream 
{ 
    private readonly Stream stream; 

    public CacheStream(Stream stream) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     OutputStream = new CacheOutputStream(this); 
    } 

    public event EventHandler<BytesReadEventArgs> BytesRead = delegate { }; 
    public event EventHandler Closing = delegate { }; 

    public Stream OutputStream { get; private set; } 

    public override void Flush() 
    { 
     stream.Flush(); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new InvalidOperationException("Cannot seek in CachingStream."); 
    } 

    public override void SetLength(long value) 
    { 
     stream.SetLength(value); 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     int numberOfBytesRead = stream.Read(buffer, offset, count); 

     if (numberOfBytesRead > 0) 
      PipeToOutputStream(buffer, offset, numberOfBytesRead); 

     return numberOfBytesRead; 
    } 

    private void PipeToOutputStream(byte[] buffer, int offset, int numberOfBytesRead) 
    { 
     var tmp = new byte[numberOfBytesRead]; 
     Array.Copy(buffer, offset, tmp, 0, numberOfBytesRead); 
     BytesRead(this, new BytesReadEventArgs(tmp)); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     throw new InvalidOperationException("Cannot write in CachingStream."); 
    } 

    public override bool CanRead 
    { 
     get { return stream.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return false; } 
    } 

    public override long Length 
    { 
     get { return stream.Length; } 
    } 

    public override long Position 
    { 
     get { return stream.Position; } 
     set { throw new InvalidOperationException("Cannot set position in CachingStream."); } 
    } 

    public override void Close() 
    { 
     Closing(this, EventArgs.Empty); 
     base.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     base.Dispose(disposing); 
     OutputStream.Dispose(); 
    } 
} 

而且

/// <summary> 
/// Output portion of CacheStream. Streams bytes from a queue of buffers. 
/// Thread safe. 
/// </summary> 
public class CacheOutputStream : Stream 
{ 
    private volatile int position; 
    private volatile int length; 
    private volatile bool sourceIsClosed; 

    // No Deque<T> in the BCL yet, but LinkedList is more or less the same. 
    private readonly LinkedList<byte[]> buffers = new LinkedList<byte[]>(); 

    public CacheOutputStream(CacheStream stream) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 

     stream.BytesRead += (o, e) => AddToQueue(e.Buffer); 
     stream.Closing += (o, e) => sourceIsClosed = true; 
    } 

    private void AddToQueue(byte[] buffer) 
    { 
     if (buffer.Length == 0) 
      return; 

     lock (buffers) 
     { 
      buffers.AddLast(buffer); 
      length += buffer.Length; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     if (buffer == null) throw new ArgumentNullException("buffer"); 

     bool noMoreBuffersAvailable = HasNoMoreBuffersAvailable(); 

     // Guard clause - closed and nothing more to write. 
     if (noMoreBuffersAvailable && sourceIsClosed) 
      return 0; 

     if (noMoreBuffersAvailable) 
     { 
      // Not closed yet! Block infinitely until we get closed or have some data. 
      while (HasNoMoreBuffersAvailable()) 
      { 
       if (sourceIsClosed) 
        return 0; 

       Thread.Sleep(TimeSpan.FromMilliseconds(50)); 
      } 
     } 

     byte[] currentBuffer = GetCurrentBuffer(); 
     int numberOfBytesRead = DoRead(buffer, count, currentBuffer, offset); 

     PutLeftoverBytesAtFrontOfQueue(currentBuffer, numberOfBytesRead); 

     return numberOfBytesRead; 
    } 

    // Check if caller didn't have enough space to fit the buffer. 
    // Put the remaining bytes at the front of the queue. 
    private void PutLeftoverBytesAtFrontOfQueue(byte[] currentBuffer, int numberOfBytesRead) 
    { 
     if (currentBuffer == null) throw new ArgumentNullException("currentBuffer"); 

     if (numberOfBytesRead == currentBuffer.Length) 
      return; // Clean read! 

     var remainingBuffer = new byte[currentBuffer.Length - numberOfBytesRead]; 
     Array.Copy(currentBuffer, numberOfBytesRead, remainingBuffer, 0, remainingBuffer.Length); 

     lock (buffers) 
      buffers.AddFirst(remainingBuffer); 
    } 

    private int DoRead(byte[] buffer, int count, byte[] currentBuffer, int offset) 
    { 
     int maxNumberOfBytesWeCanWrite = Math.Min(count, currentBuffer.Length); 

     Array.Copy(currentBuffer, 0, buffer, offset, maxNumberOfBytesWeCanWrite); 
     position += maxNumberOfBytesWeCanWrite; 

     return maxNumberOfBytesWeCanWrite; 
    } 

    private byte[] GetCurrentBuffer() 
    { 
     byte[] currentBuffer; 

     lock (buffers) 
     { 
      currentBuffer = buffers.First.Value; 
      buffers.RemoveFirst(); 
     } 

     return currentBuffer; 
    } 

    private bool HasNoMoreBuffersAvailable() 
    { 
     lock (buffers) 
      return buffers.Count == 0; 
    } 

    public override void Flush() { } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new InvalidOperationException("Cannot seek in CachingStream."); 
    } 

    public override void SetLength(long value) 
    { 
     throw new InvalidOperationException("Cannot set length in CachingStream."); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     throw new InvalidOperationException("Cannot write in a CachingStream."); 
    } 

    public override bool CanRead 
    { 
     get { return true; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return false; } 
    } 

    public override long Length 
    { 
     get { return length; } 
    } 

    public override long Position 
    { 
     get { return position; } 
     set { throw new InvalidOperationException("Cannot set position in CachingStream."); } 
    } 
} 
+0

我正要給這個旋轉..你已經'BytesReadEventArgs'嗎?這對你有用嗎? – LamonteCristo 2011-02-05 02:30:58

+0

嗨,理查德我知道現在這可能已經很老了,但是想知道你的WCF實現中是否使用了流式傳輸模式 – user919426 2015-03-14 12:49:23