2016-11-17 82 views
2

我對Rx非常陌生,並試圖圍繞它來包裹我的頭。沒有閱讀很多,但試圖通過實驗室的第一手。Rx.net - 同步與異步觀察者 - 取決於源?

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values with delays 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100)); 
     IObserver<int> handler = null; 

     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

這產生如預期異步交叉執行。另一方面,如果我將源更改爲同步,即使觀察者變爲阻塞和同步(同一個線程ID,如果沒有完全使用sub1,也不會進入sub2)。 有人可以幫我理解這一點嗎?這裏的同步版本

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i); 
     IObserver<int> handler = null; 

     // two observers that consume - first with a delay and the second immediately. 
     // in this case, the behavior of the observers becomes synchronous? 
     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

回答

2

我相信原因是運營商所選擇的默認IScheduler。看看接受的答案here

對於Generate它取決於過載。根據答案,這些是使用的默認調度程序。您可以驗證他們的來源,如果你喜歡

  • 的時間運算默認ISchedulerDefaultScheduler.Instance
  • 默認IScheduler後者算CurrentThreadScheduler.Instance

可以通過提供一個「非確認此阻止「您的同步版本中的調度程序

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);

+0

看起來像是如果你使用時間跨度的重載選擇'Scheduler.Default'調度器,而沒有時間跨度的調度器則使用'Scheduler.Immediate' – Raghu