2015-12-03 70 views
0

我有一個基本模擬狀態圖的可觀察流。例如:通過可觀察流循環

whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .subscribe(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }); 

的問題是,我想遍歷登錄,直到一些事件發生(這是在Android長時間運行的服務)。現在我能夠通過保持觀察的一個變量,訂閱,然後取消訂閱並重新訂閱它在它做到這一點的onComplete

obs = whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .doOnNext(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }) 
    .doOnComplete(i -> { 
     obs.unsubscribe(); 
     obs.subscribe(); 
    } 
    obs.subscribe(); 

但我有種感覺我做的事情真的錯了這裏。有沒有更好的方法來實現這一點?我看着retry,但拋出錯誤只是爲了讓它重試似乎與我現在所做的一樣糟糕。

+0

是否isThatThing()getter方法?如果是這樣,則安裝人員應提出通知該財產已更改的事件。在.net中,我們有INotifyPropertyChanged模式/接口。 – Aron

+0

不太熟悉那種模式。我會調查。謝謝。 –

+0

對不起,你的問題不清楚。你提到一個狀態圖,但你的代碼沒有顯示。你能否包含一個你想達到的大理石圖?無論如何,狀態圖通常使用'scan'很容易實現。 –

回答

0

閱讀你的代碼,它看起來像你想filter

whenThisHappens() 
     # ignore uninteresting things 
     .filter(i -> i.isThisThing()) 
     # do stuff on interesting things 
     .subscribe(item -> log("Got: " + item.toString())); 

有兩個可選arguemnts這個基本subscribe這是一個on-error功能,如果你需要,你可以利用一個on-complete功能 - 但預訂是在這裏自動管理。

+0

這部分不是問題。實際的狀態圖並不那麼簡單。問題在於我完成後等待第一個事件。在你的代碼中,如果在那裏有一個'switchMap',一旦我們得到了流的結果,因爲whenThis會出現可觀察的unsubscribes和流轉換。當我們在某處有一個'switchMap'時,我們需要從'whenThisHappens()'開始時再次從'item'開始。 –

+0

爲了進一步說明,whenThisHappens只發生一次訂閱,因爲狀態。在現實生活中,這可能與用戶正在運行的活動識別事件相似。一旦用戶正在運行,我不再關心正在運行的事件,並開始尋找其他事件。對於例如我切換到一個不同的觀察點,從傳感器數據中獲取卡路里,並在用戶燃燒x卡路里後通知用戶。但是一旦我完成了,我就會關心下一次我再次遇到一個正在運行的事件,但是直到那時纔開始。那有意義嗎? –

1

我認爲你所要做的事情最好用PublishSubjectBehaviorSubject完成。

該流將發佈有關該主題的項目,這將觸發您的訂閱。

這裏是事件流類,我寫了前段時間:

public class SubjectEventStream implements IEventStream { 
    private final BehaviorSubject<IEvent> stream = BehaviorSubject.create(); 

    @Override 
    public void publish(Observable<IEvent> event) { 
     event.doOnNext(stream::onNext).subscribe(); 
    } 

    @Override 
    public Observable<IEvent> observe() { 
     return stream; 
    } 

    @Override 
    public <T> Observable<T> observe(Class<T> eventClass) { 
     return stream.ofType(eventClass); 
    } 
} 

看到這裏一些更多的信息:

http://reactivex.io/documentation/subject.html

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html