2016-04-08 32 views
2

我試圖處理最壞的情況時偶然發現了一個問題;即在訂閱完成之前處置訂閱的IConnectableObservable處理IConnectableObservable不發送OnComplete

我寫過一個重現問題的人爲例子。

var hotSource = Observable 
    .Return(1) 
    .Delay(TimeSpan.FromMilliseconds(500)) 
    .Publish(); 

var disposable = hotSource.Connect(); 

這裏是我的熱可觀察。我添加了一個延遲,以便我可以在觸發價值之前訂閱它。

下面我返回一個等待觀察,應該讓我等待,直到hotSource完成。我相信我應該通過這一點,如果我馬上處理,然後等待我的訂閱訂閱現在

var awaitable = hotSource 
    .Do((Console.WriteLine)) 
    .Finally(() => Console.WriteLine("Complete")) 
    .LastOrDefaultAsync(); 

disposable.Dispose(); 
await awaitable; 

它只是掛起的應用程序完全和Finally永遠不會被調用。

我在等我的awaitable返回OnCompleted或者最壞的OnErrorObjectDisposedException。另外,如果您在訂閱後連接,它仍會掛起。有什麼想法嗎?

[更新]

以supertoi的答案(我不是訂閱直至處置後)我與處置前明確訂閱重新的問題。

var mutex = new SemaphoreSlim(0); 

var hotSource = Observable 
    .Return(1) 
    .Delay(TimeSpan.FromMilliseconds(500)) 
    .Publish(); 

var subscription = Observable.Create(
    (IObserver<int> observer) => 
    { 
     Console.WriteLine("Subscribed"); 
     return hotSource.Subscribe(observer); 
    }) 
    .Finally(() => mutex.Release()) 
    .Subscribe(Console.WriteLine); 

hotSource 
    .Connect() 
    .Dispose(); 

await mutex.WaitAsync(); 
Console.WriteLine("Complete"); 

這再次暫停,但其挑釁訂閱到IConnectableObservable已經佈置了。

回答

2

LastOrDefaultAsync()返回IObservable,因此它不會訂閱您的序列。 await訂閱您的序列。

the comment檢查上IConnectableObservable.Dispose()

一次性用於斷開其源,可觀察包裝導致訂閱觀察者以停止從底層觀察到的序列接收的值。

這意味着所有的冒落通知包括OnNextOnErrorOnCompleted停止。

因此,如果您在處理IConnectableObservableawait,您將不會收到任何通知。您可以再次Connecthotsource以接收通知。

+0

我已更新我的問題,以確保我敢反對訂閱。它仍然沒有發送完整的通知。我想它只能發送它的源代碼完成。 –