2017-06-02 50 views
0

我想連鎖兩個RxJava Single實例來創建Observable,它們發出兩個結果。另外,我需要第一個Single的結果來創建第二個。如何幫助Java解析RxJava組合方法中的類型?

這是我曾嘗試:

public static <A extends C, B extends C, C> Observable<C> chain(final Single<A> a, final Function<A, Single<B>> f) { 
    return Observable.concat(
     a.toObservable(), 
     a.flatMap(f::apply).toObservable()); 
} 

用法可能是這樣的:

final Observable<Event> task = MoreObservables.chain(
    writeFile("Hello, world", "hello.txt"), 
    writeFileEvent -> processFile(writeFileEvent.path)); 

然而,Java抱怨說,它無法解析類型:

Error:(54, 61) java: incompatible types: cannot infer type-variable(s) A,B,C 
(argument mismatch; bad return type in lambda expression 
    io.reactivex.Single<ProcessFileEvent> cannot be converted to io.reactivex.Single<Event>) 

中當然,ProcessFileEvent執行Event

我該如何編寫我的函數,以便Java可以計算出類型?還是有更簡單的方法來實現這一點?

回答

2

很難說出爲什麼在編制錯誤時不知道確切的writeFileprocessFile簽名(來自簡單模擬應該編譯)。

無論如何,更慣用的方法是使用compose()方法使用自定義ObservableTransformer,爲了有一個單一的鏈,而不是包裝方法,使鏈少可讀(read this)。
這裏也存在邏輯問題,因爲您使用的是concat()並使用了兩次a Observable,您將實際執行兩次操作(a將被預訂兩次),這可能導致性能問題在最不利的情況下發生,或一個重大的細微錯誤。 (在你的例子中,你會寫兩次相同的文件)。
我想你應該在這種情況下使用發佈,爲了執行一次,與合併一起,這將導致Observable將發出A的結果,然後執行B與A的結果,並將發出此結果:

變壓器:

class PublishAndMergeTransformer<A extends C, B extends C, C> implements ObservableTransformer<A, C> { 

     final Function<A, Single<B>> f; 

     public PublishAndMergeTransformer(Function<A, Single<B>> f) { 
      this.f = f; 
     } 

     @Override 
     public ObservableSource<C> apply(Observable<A> a) { 
      return a.publish(aObservable -> 
        Observable.merge(
          aObservable, 
          aObservable 
            .flatMap(a1 -> f.apply(a1).toObservable()) 
        ) 
      ); 
     } 
    } 

和使用示例:

writeFile("Hello, world", "hello.txt") 
    .toObservable() 
    .compose(new PublishAndMergeTransformer<>(writeFileEvent -> processFile(writeFileEvent.path))); 
+0

這完美地工作,而鏈接的文章是非常有幫助了。 – sdgfsdh