2
如何使用反應式擴展,如何創建一個Observable,它將持續調用流的Read方法並將結果傳播給其觀察者?可讀的流,直到它結束或出現錯誤
或者這是接近事物的完全錯誤的方式?我應該實施自己的IObservable嗎?
如何使用反應式擴展,如何創建一個Observable,它將持續調用流的Read方法並將結果傳播給其觀察者?可讀的流,直到它結束或出現錯誤
或者這是接近事物的完全錯誤的方式?我應該實施自己的IObservable嗎?
我從來沒有遇到過實施我自己的observable有意義的情況。
試試這個:
public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
return Observable.Create<byte[]>(o =>
{
var buffer = new byte[bufferSize];
var read = 0;
try
{
while (true)
{
read = stream.Read(buffer, 0, buffer.Length);
if (read == 0)
{
break;
}
var results = buffer.Take(read).ToArray();
//Always return a copy
//never the buffer for concurrency's sake.
o.OnNext(results);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
finally
{
o.OnCompleted();
}
return Disposable.Empty;
});
}
這是好事,但只有當訂閱運行。如果我想擁有多個訂閱者並一直運行,該怎麼辦?我相信我需要使用Observer.Generate來代替。試圖找出現在。 – NoPyGod
@NoPyGod - 您不希望多個訂閱者訪問這一個流。每個人都會競爭獲取字節。您需要一個可以發佈給多個訂閱者的單一閱讀流。或者你可以將文件讀入內存並移動。理想情況下,您的簽名應爲'IObservable ObservableRead(Func streamFactory,int bufferSize)',以使此類方法正常工作。 –
Enigmativity
爲什麼你在'finally'中調用'OnCompleted()'?如果你把它放在'try'的末尾,它會不會起到同樣的作用? – svick