2013-01-19 55 views
2

如何使用反應式擴展,如何創建一個Observable,它將持續調用流的Read方法並將結果傳播給其觀察者?可讀的流,直到它結束或出現錯誤

或者這是接近事物的完全錯誤的方式?我應該實施自己的IObservable嗎?

回答

2

我從來沒有遇到過實施我自己的observable有意義的情況。

試試這個:

public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize) 
{ 
    return Observable.Create<byte[]>(o => 
    { 
     var buffer = new byte[bufferSize]; 
     var read = 0; 
     try 
     { 
      while (true) 
      { 
       read = stream.Read(buffer, 0, buffer.Length); 
       if (read == 0) 
       { 
        break; 
       } 
       var results = buffer.Take(read).ToArray(); 
       //Always return a copy 
       //never the buffer for concurrency's sake. 
       o.OnNext(results); 
      } 
     } 
     catch (Exception ex) 
     { 
      o.OnError(ex); 
     } 
     finally 
     { 
      o.OnCompleted(); 
     } 
     return Disposable.Empty; 
    }); 
} 
+0

這是好事,但只有當訂閱運行。如果我想擁有多個訂閱者並一直運行,該怎麼辦?我相信我需要使用Observer.Generate來代替。試圖找出現在。 – NoPyGod

+1

@NoPyGod - 您不希望多個訂閱者訪問這一個流。每個人都會競爭獲取字節。您需要一個可以發佈給多個訂閱者的單一閱讀流。或者你可以將文件讀入內存並移動。理想情況下,您的簽名應爲'IObservable ObservableRead(Func streamFactory,int bufferSize)',以使此類方法正常工作。 – Enigmativity

+0

爲什麼你在'finally'中調用'OnCompleted()'?如果你把它放在'try'的末尾,它會不會起到同樣的作用? – svick