我有一系列使用RX發佈/訂閱模型的模塊。無效擴展 - 引發異步事件並在特定線程上訂閱
這裏是事件註冊代碼(每個訂閱模塊重複):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
出版商很簡單,這要歸功於José Romaniello's code:
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
現在我的問題:正如你可以在上面看到我使用.ObserveOn(Scheduler.TaskPool)方法從每個模塊的池中爲每個事件分配一個新線程。這是因爲我有很多事件和模塊。當然,問題在於事件按時間順序混合在一起,因爲有些事件彼此接近,然後以錯誤的順序調用OnDataEvent回調(每個OnDataEvent都帶有一個時間戳)。
有沒有簡單的方法來使用RX來確保正確的事件順序?或者我可以編寫自己的調度程序,以確保每個模塊按順序獲取事件?
當然,事件按照正確的順序發佈。
在此先感謝。
'Synchronize'方法除了確保底層observable遵守可觀察合約 - 即'OnNext *(OnError | OnCompleted)'外不做任何其他事情。 – Enigmativity