我想在流上執行一些操作,然後將流拆分爲兩個流,然後分別處理它們。RxJava:Split Rx可以流入多個流
例子來說明問題:
Flowable<SuccessfulObject> stream = Flowable.fromArray(
new SuccessfulObject(true, 0),
new SuccessfulObject(false, 1),
new SuccessfulObject(true, 2));
stream = stream.doOnEach(System.out::println);
Flowable<SuccessfulObject> successful = stream.filter(SuccessfulObject::isSuccess);
Flowable<SuccessfulObject> failed = stream.filter(SuccessfulObject::isFail);
successful.doOnEach(successfulObject -> {/*handle success*/}).subscribe();
failed.doOnEach(successfulObject -> {/*handle fail*/}).subscribe();
類:
class SuccessfulObject {
private boolean success;
private int id;
public SuccessfulObject(boolean success, int id) {
this.success = success;
this.id = id;
}
public boolean isSuccess() {
return success;
}
public boolean isFail() {
return !success;
}
public void setSuccess(boolean success) {
this.success = success;
}
@Override
public String toString() {
return "SuccessfulObject{" +
"id=" + id +
'}';
}
}
但這代碼打印的所有元素兩次,而我想分裂一次之前完成所有操作。
輸出:
OnNextNotification [SuccessfulObject {ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
OnNextNotification [SuccessfulObject { ID = 0}]
OnNextNotification [SuccessfulObject {ID = 1}]
OnNextNotification [SuccessfulObject {ID = 2}]
OnCompleteNotification
如何處理流以接收此行爲?
你要合併的處理重新走到一起,以一個流的結果 –
沒有,只是拆分流和執行所有操作分開。 –
好吧,然後使用@akarnokd的解決方案。作爲一個旁節點:不要在rx-pipeline中使用可變對象。另外,isFail並不是必需的,因爲isSuccess意味着錯誤,它失敗了。 –