2017-05-19 59 views
0

我使用retrofit2,rxjava2和adapter-rxjava來實現我的http api調用。我如何定製我自己的Observable?

//API definition 
Observable<String> queryProducts(@Body Query query); 

//API implementation. 
serviceApi.queryProducts(query) 
       .subscribeOn(new Scheduler().ioThread()) 
       .observeOn(new Scheduler().mainThread()) 
       .subscribe(new Observer()); 

如果我有很多的API需要實現,每一個人API的實現需要添加以下兩行:

.subscribeOn(new Scheduler().ioThread()) 
.observeOn(new Scheduler().mainThread()) 

我不想增加他們在每一個API的實現。我想使用MyObservable作爲我的api定義的結果類型。

我的想法看起來象下面這樣:

//API definition 
MyObservable<String> queryProducts(@Body Query query); 

//MyObservable definition 
    public class MyObservable<T> extends Observable<T> { 
     /** 
     * Creates an Observable with a Function to execute when it is subscribed to. 
     * <p> 
     * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor, 
     * unless you specifically have a need for inheritance. 
     * 
     * @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called 
     */ 
     protected MyObservable(OnSubscribe<T> f) { 
      super(f); 
      this.subscribeOn(new Scheduler().ioThread()); 
      this.observeOn(new Scheduler().mainThread()); 
     } 
    } 

當我運行它,我得到以下異常:

java.lang.IllegalArgumentException異常:無法爲 MyObservable創建呼叫轉接器。

我追溯了RxJavaCallAdapterFactory.java代碼 https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java。我在第100行發現了RxJavaCallAdapterFactory,它似乎只允許Obse​​rvable類通過這個檢查點。我無法擴展和重寫此方法,因爲此類是最終的類。

if (rawType != Observable.class && !isSingle && !isCompletable) { 
     return null; 
} 

有什麼方法可以在超類中添加這兩行,我不想在每個api實現中添加它們嗎?非常感謝。

回答

2

雖然在RxJava2中可以使用safely extend Observable,但它不太可能適用於這種情況,您希望重用常用代碼而不是複製它(但從頭開始創建Observable,通常用於包裝外部異步回調代碼)。
相反,您可以使用compose()運算符,它將Observable轉換爲您的自定義代碼,並且它是將常用邏輯添加到Observable的經典代碼。
您可以按照Dan Lew's article爲例來準確地滿足您的需求(加入Schedulers)。

對改造適配器進行改造,因爲它使用反射創建服務,所以無法支持​​自定義類,但會生成現有的RxJava類。
順便說一句,你正在尋找/使用改進的RxJava1適配器與RxJava2,你需要切換到RxJava2適配器,與RxJava2適配器,你可以看到實際改造使用自己的自定義Observable類。

如果使用compose()是不夠的,你(因爲你還需要將它添加到每個API),官方式,它通過包裝RxJava2CallAdapterFactory委託適應它來創建自己的CallAdapter.Factory和實施改造CallAdapter,然後包裹用您的自定義代碼/操作員/ schdeulers返回Observable。看到這個tutorial。或者在某些library I'm working on處使用RxJava2(非常相似)。

+0

非常感謝,對我幫助很大。 – user3034559

+0

嗨Yosriz, 我遵循你的例子。我在CallAdapter類中添加了一些代碼。但它似乎沒有奏效。 @Override public object adapt(致電致電){obj = wrapped.adapt(call); (obj instanceof Observable){Observable requestObservable =(Observable)obj; requestObservable.subscribeOn(調度程序。ioThread()) .observeOn(Scheduler.mainThread()); return requestObservable; } return wrapped.adapt(call); } – user3034559

+0

嗨,真的很難理解這樣的代碼,在我看來,你正在返回原始的自適應requestObservable(Observable是不可變的),你必須返回由操作符創建的Observable,而且我不認爲這是值得的只需應用調度程序的工作,默認情況下,Retrofit Observables在io調度程序上起作用,因此您只需添加mainThread調度程序 – yosriz

相關問題