2017-01-31 139 views
1

爲了保持可觀察狀態,我有一個關於什麼是更好的RxJava模式的問題。在Rx中保持可觀察狀態的最佳方式

爲了簡單起見,我們假設我們有一個StateManager類,它需要跟蹤系統中的某個狀態(讓我們假設它是一個簡單的布爾標誌)並以可觀察的方式公開它。這樣,將有一個方法,如下面:

class StateManager { 
    Observable<Boolean> state(); 
    ... 
} 

這位經理有一個很長的生命週期,可以有多個「客戶」(例如觀點,其他管理人員等),將隨時訂閱或退訂。國家將根據一些內部事件而改變。

最明顯的方式來處理這將是保持在BehaviourSubject狀態,以供消費者直接掛鉤:

class StateManager { 

    Subject mStateSubject = BehaviourSubject.create(true);   

    Observable<Boolean> state() { 
     return mStateSubject.asObservable(); 
    }  
    ... 
} 

是否有這更好的方法呢?

回答

2

Subjects可能是使用反應性庫最不理想的方式,儘管它當然可以工作。

功能反應式編程最適合沒有狀態。 Subjects是一種狀態。我建議更改您的代碼,以便將Observable定義爲功能操作符的組合。這樣可以很容易地測試和管理您的可觀察信息正在發出的消息。


我更喜歡C#開發人員,所以我希望你能原諒不同的語法。這裏有一個例子:

void Main() 
{ 
    var tracker = new AddTracker(); 
    tracker.getSums().Subscribe(i => Console.WriteLine(i)); 
    Observable.Interval(TimeSpan.FromMilliseconds(100)) 
     .Timestamp() 
     .Select(t => t.Timestamp.Second) 
     .Take(20) 
     .Subscribe(i => tracker.setA(i % 7)); 

    Observable.Interval(TimeSpan.FromMilliseconds(75)) 
     .Timestamp() 
     .Select(t => t.Timestamp.Millisecond) 
     .Take(30) 
     .Subscribe(i => tracker.setB(i % 9)); 

} 

public class AddTracker 
{ 
    private readonly ISubject<int> _a; 
    private readonly ISubject<int> _b; 
    private readonly IObservable<int> _sums; 
    private readonly IDisposable _dummySub; 

    public AddTracker() 
    { 
     _a = new BehaviorSubject<int>(0); 
     _b = new BehaviorSubject<int>(0); 
     _sums = _a 
      .CombineLatest(_b, (a, b) => a + b) 
      .Replay(1) 
      .RefCount(); 
     _dummySub = _sums.Subscribe(_ => { }); 
    } 

    public void setA(int value) 
    { 
     _a.OnNext(value); 
    } 

    public void setB(int value) 
    { 
     _b.OnNext(value); 
    } 

    public IObservable<int> getSums() 
    { 
     return _sums; 
    } 
} 

在C#中的土地,你可以換出_a_b科目的事件,這是一個溫和的改善。我知道Java中沒有一流的事件,所以我不確定這會轉化成什麼。

但更基本的是,在C#和Java中,你應該問的問題是......什麼導致setAsetB調用?你可以用它替換它們:

void Main() 
{ 
    var aStream = Observable.Interval(TimeSpan.FromMilliseconds(100)) 
     .Timestamp() 
     .Select(t => t.Timestamp.Second) 
     .Take(20); 

    var bStream = Observable.Interval(TimeSpan.FromMilliseconds(75)) 
     .Timestamp() 
     .Select(t => t.Timestamp.Millisecond) 
     .Take(30); 

    var tracker = new AddTracker(aStream, bStream); 
    tracker.getSums().Subscribe(i => Console.WriteLine(i)); 

} 

public class AddTracker 
{ 
    private readonly IObservable<int> _sums; 
    private readonly IDisposable _dummySub; 

    public AddTracker(IObservable<int> a, IObservable<int> b) 
    { 
     _sums = a 
      .CombineLatest(b, (aItem, bItem) => (aItem % 9) + (bItem % 7)) 
      .Replay(1) 
      .RefCount(); 
     _dummySub = _sums.Subscribe(_ => {}); 
    } 

    public IObservable<int> getSums() 
    { 
     return _sums; 
    } 
} 

總之,如果你必須,從主題開始。然後把你的主題,並儘可能遠離你的邏輯推動他們。

+0

是的,這些都是非常好的點。但是,就我而言,我*需要保持一種狀態並向消費者公開變更(如果有的話)。你有沒有更適合我的案例的「功能操作員組合」的更具體的例子? –

+0

增加了一個有點人爲的例子。 – Shlomo

1

描述的情況是它所謂的'熱''可觀察到的 - 它的生產者(排放源)在訂閱之外創建, (如Ben Lesh在Hot vs Cold Observables所述 - 推薦閱讀)。

正如Shlomo所說,受試者是Rx世界的「可變變量」,你可以通過使用Obsevable.create(它聽取事件並基於它們產生排放)創建一個「冷」 ,然後使用將其轉換爲ConnectableObservable(如共享,發佈)的運算符使其「熱」,以便將它多播到多個在不同時間訂閱的觀察者。

但是,在這種情況下,由於生產者是您的班級的本地人(事件是由這個班級生成的),因此您可以使用Subject來達到此目的,因爲班級本身就是排放源,產生事件的相互/狀態變量。 (基於answer given by Erik Meijer,和這個blog post

相關問題