2017-04-22 122 views
1

我想嘗試像下面RxJava編譯下列情況下錯誤

int[] test = {1,2,3,4}; 
Observable<Integer> findAverage = Observable.fromArray(test); 
averageInteger(findAverage).subscribe(System.out::println); 

首先,很簡單RxJava代碼我遇到了Observeable.fromArray(測試),

編譯錯誤(incompitible界)

二,似乎severageInteger不能再被發現。我正在使用RxJava的2.0.8版本。

+0

您至少需要'Integer [] test = {1,2,3,4};'因爲原始數組不受RxJava支持。 – akarnokd

回答

0

這是因爲錯誤的類型findAverage的:

它應該是:

Observable<int[]> observable = Observable.fromArray(array); 

與以下搖籃封裝完整例如:「com.github.akarnokd:rxjava2的擴展: 0.16.4'

List<Integer> integers = Arrays.asList(1, 2, 3, 4); 

Observable<Integer> integerObservable = Observable.fromIterable(integers); 

Observable<Double> doubleObservable = MathObservable.averageDouble(integerObservable); 

doubleObservable.subscribe(System.out::println); 

實現爲運營商:

@Test 
public void averageDoubleOperator() throws Exception { 
    List<Integer> integers = Arrays.asList(1, 2, 3, 4); 

    Observable<Integer> integerObservable = Observable.fromIterable(integers); 

    integerObservable.lift(new AvgOperator()) 
      .test() 
      .assertResult(2.5); 
} 

@Test 
public void averageDoubleOperator_empty() throws Exception { 
    List<Integer> integers = Collections.emptyList(); 

    Observable<Integer> integerObservable = Observable.fromIterable(integers); 

    integerObservable.lift(new AvgOperator()) 
      .test() 
      .assertTerminated() 
      .assertError(IllegalStateException.class); 
} 

class Avg { 
    long count; 
    int sum; 

    Avg(long count, int sum) { 
     this.count = count; 
     this.sum = sum; 
    } 
} 

class AvgOperator implements ObservableOperator<Double, Integer> { 
    @Override 
    public Observer<? super Integer> apply(Observer<? super Double> observer) throws Exception { 
     return new Operation(observer); 
    } 

    class Operation implements Observer<Integer> { 
     private final Observer<? super Double> observer; 
     private Subscription s; 
     private long sum; 
     private int count; 

     Operation(Observer<? super Double> observer) { 
      this.observer = observer; 
     } 

     @Override 
     public void onSubscribe(Disposable d) { 
      observer.onSubscribe(d); 
     } 

     @Override 
     public void onNext(Integer integer) { 
      ++count; 
      sum = sum + integer; 
     } 

     @Override 
     public void onError(Throwable t) { 
      this.observer.onError(t); 
     } 

     @Override 
     public void onComplete() { 
      if (count == 0) { 
       this.onError(new IllegalStateException("Average is not defined for 0 values.")); 
       return; 
      } 
      this.observer.onNext(sum/(double) count); 
      this.observer.onComplete(); 
     } 
    } 
} 
+0

看起來像MathObservable來自rxjava2數學不受rxjava2支持 –

+0

如前所述,MathObservable取自rxjava2擴展,因爲它在本地rxjava2中不受支持。您可以複製MathObservable的實現或使用reduce操作符編寫自己的實現。 –

相關問題