2011-10-19 45 views
3

我是Rx的新手。我想知道是否有可能向不同的訂閱者發送消息,以便他們在不同的線程上運行?一個IObserable控件如何能控制它?簡單的Subject實現,據我所知,它在單個線程上一個接一個地調用訂閱者。是否可以在Rx的不同線程上調用訂閱者的OnNexts?


public class Subsciber : IObserver<int> 
{ 
    public void OnNext(int a) 
    { 
     // Do something 
    } 
    public void OnError(Exception e) 
    { 
     // Do something 
    } 
    public void OnCompeleted() 
    { 
    } 

} 

public static class Program 
{ 
    public void static Main() 
    { 
     var observable = new <....SomeClass....>(); 
     var sub1 = new Subscriber(); 
     var sub2 = new Subscriber(); 
     observable.Subscribe(sub1); 
     observable.Subscribe(sub2); 
     // some waiting function 
    } 
} 

如果我使用的主題爲 'SomeClass的',然後SUB2的OnNext()將不會被調用,直到SUB1的OnNext()完成。如果sub1需要很多時間,我不希望它延遲sub2的接收。有人能告訴我Rx如何爲SomeClass實現這種實現。

+0

可觀察到熱或冷? – Richard

回答

7

您編寫的代碼幾乎可以並行運行observable。如果你寫你的觀察,因爲這:

public class Subscriber : IObserver<int> 
{ 
    public void OnNext(int a) 
    { 
     Console.WriteLine("{0} on {1} at {2}", 
      a, 
      Thread.CurrentThread.ManagedThreadId, 
      DateTime.Now.ToString()); 
    } 
    public void OnError(Exception e) 
    { } 
    public void OnCompleted() 
    { } 
} 

然後運行該代碼:

var observable = 
    Observable 
     .Interval(TimeSpan.FromSeconds(1.0)) 
     .Select(x => (int)x) 
     .Take(5) 
     .ObserveOn(Scheduler.ThreadPool); 
var sub1 = new Subscriber(); 
var sub2 = new Subscriber(); 
observable.Subscribe(sub1); 
observable.Subscribe(sub2); 
Thread.Sleep(10000); 

將產生如下:

0 on 28 at 2011/10/20 00:13:49 
0 on 16 at 2011/10/20 00:13:49 
1 on 29 at 2011/10/20 00:13:50 
1 on 22 at 2011/10/20 00:13:50 
2 on 27 at 2011/10/20 00:13:51 
2 on 29 at 2011/10/20 00:13:51 
3 on 27 at 2011/10/20 00:13:52 
3 on 19 at 2011/10/20 00:13:52 
4 on 27 at 2011/10/20 00:13:53 
4 on 27 at 2011/10/20 00:13:53 

它已經在不同的線程並行運行的訂閱。

我使用的重要的事情是.ObserveOn擴展方法 - 這是什麼使這項工作。

您應該記住,觀察者通常不會共享相同的observables實例。訂閱觀察者有效地將觀察者的獨特「鏈」從觀察者的源頭連接到觀察者。這與在枚舉上調用GetEnumerator兩次大致相同,您不會共享相同的枚舉器實例,您將獲得兩個唯一的實例。

現在,我想描述我的意思是一個鏈。我要給Reflector.NET從Observable.Generate & Observable.Where中提取代碼來說明這一點。

採取此代碼爲例如:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x); 
var ys = xs.Where(x => x % 2 == 0); 
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ }); 

引擎蓋下既Generate & Where每個創建AnonymousObservable<T>內部的Rx類的新實例。 AnonymousObservable<T>的構造函數需要一個Func<IObserver<T>, IDisposable>委託,它在接收到對Subscribe的調用時使用它。

從Reflector.NET爲Observable.Generate<T>(...)略微清理代碼是:

public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState, 
    Func<TState, bool> condition, 
    Func<TState, TState> iterate, 
    Func<TState, TResult> resultSelector, 
    IScheduler scheduler) 
{ 
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) => 
    { 
     TState state = initialState; 
     bool first = true; 
     return scheduler.Schedule((Action self) => 
     { 
      bool flag = false; 
      TResult local = default(TResult); 
      try 
      { 
       if (first) 
       { 
        first = false; 
       } 
       else 
       { 
        state = iterate(state); 
       } 
       flag = condition(state); 
       if (flag) 
       { 
        local = resultSelector(state); 
       } 
      } 
      catch (Exception exception) 
      { 
       observer.OnError(exception); 
       return; 
      } 
      if (flag) 
      { 
       observer.OnNext(local); 
       self(); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

Action self參數是一個遞歸調用迭代的輸出值。您會注意到,在此代碼中沒有任何地方會存儲observer或將值粘貼到多個觀察者。此代碼爲每個新觀察者運行一次。

來自Reflector的Observable.Where<T>(...)的略微清理的代碼。NET是:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source, 
    Func<TSource, bool> predicate) 
{ 
    return new AnonymousObservable<TSource>(observer => 
     source.Subscribe(x => 
     { 
      bool flag; 
      try 
      { 
       flag = predicate(x); 
      } 
      catch (Exception exception) 
      { 
       observer.OnError(exception); 
       return; 
      } 
      if (flag) 
      { 
       observer.OnNext(x); 
      } 
     }, ex => observer.OnError(ex),() => observer.OnCompleted)); 
} 

此代碼不再追蹤多個觀察者。它調用Subscribe有效地將其自己的代碼作爲觀察者傳遞給底層的source可觀察。

您應該看到,在我上面的示例代碼中,訂閱Where會創建訂閱Generate,因此這是一系列可觀察項。實際上,它正在鏈接一系列AnonymousObservable對象的訂閱呼叫。

如果你有兩個訂閱你有兩個鏈。如果您有1,000個訂閱,則您有1,000個鏈。

現在,作爲一個附註 - 即使有IObservable<T>IObserver<T>接口 - 你應該很少在你自己的類中實際實現它們。內置的類和運算符處理99.99%的所有情況。這有點像IEnumerable<T> - 你需要多久才能自己實現這個接口?

讓我知道這是否有幫助,如果你需要任何進一步的解釋。

+0

當你說大多數訂閱形成一個獨特的鏈時,我不明白。 – ada

+0

public void Run() {var src = GetInput()。ToObservable(Scheduler.NewThread); var d = src.Subscribe( c => Console.WriteLine(c +「blah1」+ DateTime.Now + System.Threading.Thread.CurrentThread.ManagedThreadId)); var d2 = src.Subscribe( c => Console.WriteLine(c +「cccc」+ DateTime.Now + System.Threading.Thread.CurrentThread.ManagedThreadId)); System.Threading.Thread.Sleep(60000); } 靜態的IEnumerable GetInput() { 而(真) { 收率返回int.Parse(到Console.ReadLine()); } } – ada

+0

抱歉上面的格式不正確。但我很驚訝地發現,d2和d在控制檯上爲控制檯上輸入的每個整數或者控制檯寫入。這是你'大多數訂閱形成一個獨特的鏈'的意思嗎? – ada

1

如果你有的IObservable,你需要強制訂閱在不同的線程運行,那麼你可以使用ObserveOn功能。

如果您運行下面的代碼,它將強制數字生成器運行在不同的線程上下文中。您也可以使用EventLoopScheduler並指定System.Thread要使用,設置優先級,集名,等等

void Main() 
{ 
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100)); 

    var disposable = new CompositeDisposable() 
    { 
     numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)), 
     numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)), 
     numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId)) 
    }; 

    Thread.Sleep(1000); 
    disposable.Dispose(); 
} 

輸出

Immediate: 10 
ThreadPool: 4 
TaskPool: 20 
TaskPool: 4 
ThreadPool: 24 
Immediate: 27 
Immediate: 10 
TaskPool: 24 
ThreadPool: 27 
Immediate: 24 
TaskPool: 26 
ThreadPool: 20 
Immediate: 26 
ThreadPool: 24 
TaskPool: 27 
Immediate: 28 
ThreadPool: 27 
TaskPool: 26 
Immediate: 10 

請注意,我如何使用CompositeDisposable最終處理所有訂閱。例如,如果你不在LinqPad中執行此操作。 Observable.Interval將繼續在內存中運行,直到您終止進程。

相關問題