您編寫的代碼幾乎可以並行運行observable。如果你寫你的觀察,因爲這:
public class Subscriber : IObserver<int>
{
public void OnNext(int a)
{
Console.WriteLine("{0} on {1} at {2}",
a,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
public void OnError(Exception e)
{ }
public void OnCompleted()
{ }
}
然後運行該代碼:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => (int)x)
.Take(5)
.ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);
將產生如下:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53
它已經在不同的線程並行運行的訂閱。
我使用的重要的事情是.ObserveOn
擴展方法 - 這是什麼使這項工作。
您應該記住,觀察者通常不會共享相同的observables實例。訂閱觀察者有效地將觀察者的獨特「鏈」從觀察者的源頭連接到觀察者。這與在枚舉上調用GetEnumerator
兩次大致相同,您不會共享相同的枚舉器實例,您將獲得兩個唯一的實例。
現在,我想描述我的意思是一個鏈。我要給Reflector.NET從Observable.Generate
& Observable.Where
中提取代碼來說明這一點。
採取此代碼爲例如:
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });
引擎蓋下既Generate
& Where
每個創建AnonymousObservable<T>
內部的Rx類的新實例。 AnonymousObservable<T>
的構造函數需要一個Func<IObserver<T>, IDisposable>
委託,它在接收到對Subscribe
的調用時使用它。
從Reflector.NET爲Observable.Generate<T>(...)
略微清理代碼是:
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
IScheduler scheduler)
{
return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
{
TState state = initialState;
bool first = true;
return scheduler.Schedule((Action self) =>
{
bool flag = false;
TResult local = default(TResult);
try
{
if (first)
{
first = false;
}
else
{
state = iterate(state);
}
flag = condition(state);
if (flag)
{
local = resultSelector(state);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(local);
self();
}
else
{
observer.OnCompleted();
}
});
});
}
的Action self
參數是一個遞歸調用迭代的輸出值。您會注意到,在此代碼中沒有任何地方會存儲observer
或將值粘貼到多個觀察者。此代碼爲每個新觀察者運行一次。
來自Reflector的Observable.Where<T>(...)
的略微清理的代碼。NET是:
public static IObservable<TSource> Where<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
source.Subscribe(x =>
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(x);
}
}, ex => observer.OnError(ex),() => observer.OnCompleted));
}
此代碼不再追蹤多個觀察者。它調用Subscribe
有效地將其自己的代碼作爲觀察者傳遞給底層的source
可觀察。
您應該看到,在我上面的示例代碼中,訂閱Where
會創建訂閱Generate
,因此這是一系列可觀察項。實際上,它正在鏈接一系列AnonymousObservable
對象的訂閱呼叫。
如果你有兩個訂閱你有兩個鏈。如果您有1,000個訂閱,則您有1,000個鏈。
現在,作爲一個附註 - 即使有IObservable<T>
和IObserver<T>
接口 - 你應該很少在你自己的類中實際實現它們。內置的類和運算符處理99.99%的所有情況。這有點像IEnumerable<T>
- 你需要多久才能自己實現這個接口?
讓我知道這是否有幫助,如果你需要任何進一步的解釋。
可觀察到熱或冷? – Richard