2011-01-28 49 views
2

我在[Rx DEVHOL202]和http://rxwiki.wikidot.com/101samples#toc48如何才能讓我的IObservable <T>多線程實現?

的基礎上寫了一個實現。這是我的代碼。 http://csharp.pastebin.com/pm2NAPx6

  1. 它的工作原理,但OnNext的電話是不是非阻塞,這就是我想實現模擬網絡閱讀和異步移交字節的每一塊,因爲它被讀取到處理程序[什麼哪這裏沒有完整顯示,但可能會緩存結果並做進一步處理]。

    這樣做的好方法是什麼?

  2. 一旦Exception拋出,所有後續的OnNext()都不會被處理! 如果我不顯式退出循環並指示完成。 這是爲什麼呢?

回答

2

我強烈建議針對試圖實施您自己的IObservable。隱式規則超越線程安全並進入方法調用排序。

一般你會從方法返回IObservable情況,但是如果你需要直接實現它的類,你應該換一個Subject

public class SomeObservable<T> : IObservable<T> 
{ 
     private Subject<T> subject = new Subject<T>(); 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      return subject.Subscribe(observer); 
     } 
} 

1.你需要小心你如何從支持此您的觀察者(正如你可能已經共享的數據),但你可以讓你在以下兩種方式之一異步調用的處理:

  • 呼叫ObserveOn(Scheduler.TaskPool)(或ThreadPool如果你是預4.0)打電話之前。這將導致(,任務在這種情況下),通過調度路由信息
  • 傳遞ISchedulerObserver構造
  • 開始從用戶

2.異步任務/線程這是預期的功能。 IObservableIObserver之間的合同是(OnNext)* (OnCompleted | OnError)?,也就是說「對OnNext進行零個或多個調用,可選地跟隨任何OnCompleted或OnError」。 OnCompleted|OnError後,致電OnNext無效。

Rx中的所有運營商(WhereSelect等)均強制執行此規則,即使源不。

0

我不知道如果我正確地理解你的問題,但爲什麼你就不能執行,你有什麼邏輯上的不同Thread,或者如果它足夠小推在ThreadPool

下面是一個例子:

ThreadPool.QueueUserWorkItem(o=> 
{ 
    _paidSubj.OnNext(this); // Raise PAID event 
}); 

我感到困惑的Subject的數據類型,我從來沒有見過在C#該類...是你創造的東西嗎?是OnNext一個事件被提出或只是一種方法?如果OnNext是一個事件,那麼你可以使用BeginInvoke異步調用它:

_paidSubj.OnNext.BeginInvoke(this, null, null); 

更新:

如果實現這種異步行爲會發生一個重要的事情:如果你通知了IObserver通過傳遞Order,當您嘗試讀取觀察者(即訂單緩衝區)中的數據時,您可能實際存在一些不一致情況,而訂單繼續修改其Read線程中的緩衝區。所以至少有兩種方法可以解決這個問題:

  1. 限制訪問將通過使用鎖修改的內存。
  2. 只有通知觀察者有關你想要看到的相關信息:
    a。通過傳遞信息作爲一個值(而不是參考)。 b。通過創建一個傳遞信息的不可變結構。

P.S. 你從哪裏得到Subject?是Subject應該是OrderObserver

+0

主體是一個Rx的東西,它代表的東西既是`IObservable`和`IObserver`,並且經常被用作一個「適配器」 – 2011-01-28 08:02:19