2
我對Rx非常陌生,並試圖圍繞它來包裹我的頭。沒有閱讀很多,但試圖通過實驗室的第一手。Rx.net - 同步與異步觀察者 - 取決於源?
class Program
{
static void Main(string[] args)
{
// one source, produces values with delays
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
IObserver<int> handler = null;
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
這產生如預期異步交叉執行。另一方面,如果我將源更改爲同步,即使觀察者變爲阻塞和同步(同一個線程ID,如果沒有完全使用sub1,也不會進入sub2)。 有人可以幫我理解這一點嗎?這裏的同步版本
class Program
{
static void Main(string[] args)
{
// one source, produces values
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
IObserver<int> handler = null;
// two observers that consume - first with a delay and the second immediately.
// in this case, the behavior of the observers becomes synchronous?
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
看起來像是如果你使用時間跨度的重載選擇'Scheduler.Default'調度器,而沒有時間跨度的調度器則使用'Scheduler.Immediate' – Raghu