2016-03-24 37 views
0

讓我舉個例子:給定從1到10的數字流,處理偶數和奇數的方式不同。在另一個線程中處理奇數並將這個轉換應用於它們(2 * i)。在主線程中處理偶數並將其應用於它們(2 * i - 1)。訂戶低於:如何在可觀察流(RxJava)中以不同方式處理每個項目

finalObservable.subscribe(new Action1<Integer>() { 
    @Override 
    public void call(Integer t) { 
     System.out.println(Thread.currentThread() + " " + t); 
    }}); 

輸出應該

Thread-1 2 
main 3 
Thread-1 6 
main 7 
Thread-1 10 
main 11 
Thread-1 14 
main 15 
Thread-1 18 
main 19 

如何做到這一點使用RxJava,觀測量運營商?

回答

2

最優雅的方式來處理這個問題是使用share()運營商Observable的。在huuuuge簡化中,它可以讓你將你的observable分成多個。所以在你的情況下,代表一串數字的觀察值可以被分成兩個觀測值。一個用於奇數,另一個用於偶數。

假設allNumbers(在你的例子finalObservable)表示< 1,10>數字流:

final Observable<Integer> allNumbers = 
     Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) 
       .share(); 
Observable<Integer> oddNumbers = allNumbers.filter(new Func1<Integer, Boolean>() { 
    @Override 
    public Boolean call(Integer integer) { 
     return integer % 2 != 0; 
    } 
}); 
Observable<Integer> evenNumbers = allNumbers.filter(new Func1<Integer, Boolean>() { 
    @Override 
    public Boolean call(Integer integer) { 
     return integer % 2 == 0; 
    } 
}); 

final Action1<Integer> printingAction = new Action1<Integer>() { 
    @Override 
    public void call(Integer t) { 
     System.out.println(Thread.currentThread() + " " + t); 
    } 
}; 

evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction); 
oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction); 

並與Retrolambda簡化:

final Observable<Integer> allNumbers = 
     Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) 
       .share(); 
Observable<Integer> oddNumbers = allNumbers.filter(integer -> integer % 2 != 0); 
Observable<Integer> evenNumbers = allNumbers.filter(integer -> integer % 2 == 0); 

final Action1<Integer> printingAction = 
     t -> System.out.println(Thread.currentThread() + " " + t); 

evenNumbers.subscribeOn(Schedulers.computation()).subscribe(printingAction); 
oddNumbers.subscribeOn(AndroidSchedulers.mainThread()).subscribe(printingAction); 

你沒」 t確切地指定需要在哪個線程中處理哪些內容,因此您可能需要更正subscribeOn參數並可能會添加observeOn運算符。取決於你的需求。

+0

謝謝,是的,這工作正常。我還有一個查詢。而不是訂閱2個不同的觀測值,是否有一種方法可以合併觀測值,使得輸出與輸入的順序相同? –

+0

您可以使用'Observable.merge',但整個想法是,有兩個不同的observables,你可以以不同的線程處理它們。 –

0

一般的想法會是這樣的(使用flatMap):

Observable.from(new Integer[]{1, 2, 3, 4, 5}) 
      .flatMap(number -> { 
       if (number % 2 == 0) { 
        return Observable.just(2 * number - 1); 
       } else { 
        return Observable.fromCallable(() -> 2 * number) 
            .subscribeOn(Schedulers.io()); 
       } 
      }) 
      .subscribe(new Action1<Integer>() { 
       @Override 
       public void call(Integer integer) { 
        System.out.println(Thread.currentThread() + " " + integer); 
       } 
      }); 
+0

我很難知道何時使用flatmap vs map,但爲什麼不能只映射到'return 2 * number',例如? –

+0

@ cricket_007沒錯,只要我們不必爲不同的值使用不同的線程 – AndroidEx

+0

我認爲這與它有關。我猜從訂閱運行線程是不好的做法? –

相關問題