2

我有一系列使用RX發佈/訂閱模型的模塊。無效擴展 - 引發異步事件並在特定線程上訂閱

這裏是事件註冊代碼(每個訂閱模塊重複):

_publisher.GetEvent<DataEvent>()     
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool) 
    .Subscribe(module.OnDataEvent); 

出版商很簡單,這要歸功於José Romaniello's code

public class EventPublisher : IEventPublisher 
{ 
    private readonly ConcurrentDictionary<Type, object> _subjects = 
     new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
     var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
     return subject.AsObservable(); 
    } 
    public void Publish<TEvent>(TEvent sampleEvent) 
    { 
     object subject; 
     if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
     { 
      ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
     } 
    } 
} 

現在我的問題:正如你可以在上面看到我使用.ObserveOn(Scheduler.TaskPool)方法從每個模塊的池中爲每個事件分配一個新線程。這是因爲我有很多事件和模塊。當然,問題在於事件按時間順序混合在一起,因爲有些事件彼此接近,然後以錯誤的順序調用OnDataEvent回調(每個OnDataEvent都帶有一個時間戳)。

有沒有簡單的方法來使用RX來確保正確的事件順序?或者我可以編寫自己的調度程序,以確保每個模塊按順序獲取事件?

當然,事件按照正確的順序發佈。

在此先感謝。

回答

0

嘗試同步方法,如:

_publisher.GetEvent<DataEvent>()     
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize() 
    .Subscribe(module.OnDataEvent); 

雖然我想你的情況有相同的代碼,發現數據的順序到達,並且不重疊。可能這是你的應用程序特有的東西。

+0

'Synchronize'方法除了確保底層observable遵守可觀察合約 - 即'OnNext *(OnError | OnCompleted)'外不做任何其他事情。 – Enigmativity

1

嘗試使用該實施EventPublisher的:

public class EventPublisher : IEventPublisher 
{ 
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>(); 

    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
     return _subject 
      .Where(o => o is TEvent) 
      .Select(o => (TEvent)o) 
      .ObserveOn(_scheduler); 
    } 

    public void Publish<TEvent>(TEvent sampleEvent) 
    { 
     _subject.OnNext(sampleEvent); 
    } 
} 

它使用一個EventLoopScheduler,以確保所有的事件發生順序和在同一後臺線程。

從訂閱中刪除ObserveOn,因爲如果您在另一個線索上觀察,則可能會再次發生錯誤順序的事件。

這是否解決您的問題?

+0

爲了與原始示例保持一致,您可以通過執行觀察位並在最初的訂閱模塊中聲明調度器來爲每個觀察者創建一個線程,只需使用每個模塊的事件循環調度程序而不是任務池。 – ForbesLindesay

+0

謝謝大家,我會嘗試這個並報告回來! – Gizz