2017-05-04 35 views
0

我想在流上執行一些操作,然後將流拆分爲兩個流,然後分別處理它們。RxJava:Split Rx可以流入多個流

例子來說明問題:

Flowable<SuccessfulObject> stream = Flowable.fromArray(
     new SuccessfulObject(true, 0), 
     new SuccessfulObject(false, 1), 
     new SuccessfulObject(true, 2)); 

stream = stream.doOnEach(System.out::println); 

Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess); 
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail); 

successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe(); 
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe(); 

類:

class SuccessfulObject { 
    private boolean success; 
    private int id; 

    public SuccessfulObject(boolean success, int id) { 
     this.success = success; 
     this.id = id; 
    } 

    public boolean isSuccess() { 
     return success; 
    } 
    public boolean isFail() { 
     return !success; 
    } 

    public void setSuccess(boolean success) { 
     this.success = success; 
    } 

    @Override 
    public String toString() { 
     return "SuccessfulObject{" + 
       "id=" + id + 
       '}'; 
    } 
} 

但這代碼打印的所有元素兩次,而我想分裂一次之前完成所有操作。

輸出:

OnNextNotification [SuccessfulObject {ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
OnNextNotification [SuccessfulObject { ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification

如何處理流以接收此行爲?

+0

你要合併的處理重新走到一起,以一個流的結果 –

+0

沒有,只是拆分流和執行所有操作分開。 –

+1

好吧,然後使用@akarnokd的解決方案。作爲一個旁節點:不要在rx-pipeline中使用可變對象。另外,isFail並不是必需的,因爲isSuccess意味着錯誤,它失敗了。 –

回答

3

使用publish分享訂閱源:(?的fork-join行爲)

Flowable<Integer> source = Flowable.range(1, 5); 

ConnectableFlowable<Integer> cf = source.publish(); 

cf.filter(v -> v % 2 == 0).subscribe(v -> System.out.println("Even: " + v)); 

cf.filter(v -> v % 2 != 0).subscribe(v -> System.out.println("Odd: " + v)); 

cf.connect(); 
+0

正是我在找什麼,謝謝! –