2017-01-18 42 views
0

考慮以下類型:在RxJava什麼是減少表達

import io.reactivex.Observable; 
import io.reactivex.observables.GroupedObservable; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

public class TypeTest { 
    public static void main(String[] args) throws Exception { 
     CountDownLatch latch = new CountDownLatch(1); 
     Observable<Long> source = Observable.interval(1,TimeUnit.MILLISECONDS).take(20); 
     Observable<GroupedObservable<String,Long>> sgb = 
         source.groupBy(x -> (x%2==0)?"Even":"Odd"); 

     // I'd like to introduce a variable for source.reduce but I get a type error. 
     // Observable<Long> sreduce = source.reduce(new Long(0),(x,y) -> (x+y)); 

     source.reduce(new Long(0),(x,y)->(x+y)).subscribe(x -> { 
      System.out.println(x); 
      latch.countDown(); 
     }); 
     latch.await(); 
    } 
} 

我可以訂閱source.reduce就好像它是可觀察的,但我不能分配作爲其類型。我應該分配什麼類型?

+0

OK之一,這只是似乎是在類型推斷的錯誤。如果我投它它運行良好。 –

回答

0

如果檢查reduce()方法的簽名,你會看到:

public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) { 

你的情況,這將是一個Observable<Long>:由於代碼矗立在你的榜樣

Observable<Long> source = Observable.interval(1, TimeUnit.MILLISECONDS).take(20); 
Observable<Long> sreduce = source.reduce(new Long(0), (x, y) -> (x + y)); 

,你正在做source.reduce(....).subscribe(...),它已經返回Subscription

1

您正在使用從包io.reactivex,這意味着你使用RxJava 2.在RxJava 2,說歸說有一個確切的項目(像reduce的結果)的反應流方式進口,返回Single<T>而不是Observable<T>

大多數方法/操作符都是一樣的,甚至還有一個.toObservable()方法將特定的轉換爲通用。

方面1:您的grouped運算符評估鍵,但不對結果做任何事情;由於該Observable上沒有訂閱,因此不會啓動計時器。

方面2:你不需要CountdownLatch等待完成。你可以做的

source.reduce(new Long(0), (x, y) -> (x + y)) 
      .doOnSuccess(System.out::println) 
      .toFuture() 
      .get(); 

source.reduce(new Long(0), (x, y) -> (x + y)) 
      .doOnSuccess(System.out::println) 
      .blockingGet();