2014-07-16 71 views
3

有人可以幫我做一個IObserver的同步訂閱,這樣調用方法會阻塞,直到訂閱完成。 如:無效擴展同步訂閱

出版商

public static class Publisher { 
public static IObservable<string> NonBlocking() 
    { 
     return Observable.Create<string>(
      observable => 
      { 
       Task.Run(() => 
       { 
        observable.OnNext("a"); 
        Thread.Sleep(1000); 
        observable.OnNext("b"); 
        Thread.Sleep(1000); 
        observable.OnCompleted(); 
        Thread.Sleep(1000); 
       }); 

       return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed")); 
      }); 
    } 

}

用戶

public static class Subscriber{ 
public static bool Subscribe() 
    { 
     Publisher.NonBlocking().Subscribe((s) => 
     { 
      Debug.WriteLine(s); 
     },() => 
     { 
      Debug.WriteLine("Complete"); 
     }); 
     // This will currently return true before the subscription is complete 
     // I want to block and not Return until the Subscriber is Complete 
     return true; 
    } 

}

回答

5

你需要使用System.Reactive.Threading.Task此:

把你觀察到成一個任務...

var source = Publisher.NonBlocking() 
    .Do(
     (s) => Debug.WriteLines(x), 
     () => Debug.WriteLine("Completed") 
    ) 
    .LastOrDefault() 
    .ToTask(); 

Do(...).Subscribe()就像Subscribe(...)。所以Do只是增加了一些副作用。

LastOrDefault就在那裏,因爲ToTask創建Task只能等待來自源Observable的第一個項目,如果沒有項目取得了它會失敗(拋出)。因此,LastOrDefault有效地導致Task等待源完成,無論它產生什麼。

所以之後我們還有一個任務,就等着它:

task.Wait(); // blocking 

或者使用異步/ AWAIT:

await task; // non-blocking 

編輯:

科里斯尼爾森取得了一個很好的點:

在C#和Visual Studio的最新版本中,您實際上可以使用await一個IObservable<T>。這是一個很酷的功能,但它的工作方式與等待Task的方式稍有不同。

當您等待任務時,它會導致任務運行。如果多次等待任務的單個實例,則該任務將只執行一次。可觀測量略有不同。你可以將一個observable想象成一個具有多個返回值的異步函數......每當你訂閱observable時,observable/function都會執行。因此,這兩個代碼段有不同的含義:

等待可觀察到的:

// Console.WriteLine will be invoked twice. 
var source = Observable.Return(0).Do(Console.WriteLine); 
await source; // Subscribe 
await source; // Subscribe 

通過任務等待可觀察到的:

// Console.WriteLine will be invoked once. 
var source = Observable.Return(0).Do(Console.WriteLine); 
var task = source.ToTask(); 
await task; // Subscribe 
await task; // Just yield the task's result. 

因此,在本質上,等待可觀察到的是這樣的:

// Console.WriteLine will be invoked twice. 
var source = Observable.Return(0).Do(Console.WriteLine); 
await source.ToTask(); // Subscribe 
await source.ToTask(); // Subscribe 

但是,await observable語法不會w Xamerin Studio中的ork(截至撰寫本文時)。如果您使用的是Xamerin Studio,我強烈建議您在最後時刻使用ToTask來模擬Visual Studio的await observable語法的行爲。

+0

奇妙的是,不知道'.Do(..)'看起來像一個LastOrDefaultAsync()方法,所以我可以做一個.Wait()從那個 – Lukie

+2

你可以直接'await'一個'的IObservable <>'。它會返回序列中的最後一個項目。 –

+0

你是對的。我會更新答案。 –