2015-10-31 62 views
8

我在Android應用使用RetrofitRxJava,和我的代碼:改造默認線程

public void getConfig(NetworkSubscriber subscriber) { 
    Observable<Config> observable = mApi.getConfig(); 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 
} 

public void getCode(String mobile, int type, NetworkSubscriber subscriber) { 
    Observable<BaseMessageEntity> observable = mApi.getCode(mobile, type); 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 
} 

而且我不想寫.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())每一個業務方法

我該怎麼辦?

回答

17

如果您不想在每次調用時指定所需的線程,可以創建一個圍繞RxJavaCallAdapterFactory的包裝,以便爲您的線程默認設置。

public class RxThreadCallAdapter extends CallAdapter.Factory { 

    RxJavaCallAdapterFactory rxFactory = RxJavaCallAdapterFactory.create(); 
    private Scheduler subscribeScheduler; 
    private Scheduler observerScheduler; 

    public RxThreadCallAdapter(Scheduler subscribeScheduler, Scheduler observerScheduler) { 
     this.subscribeScheduler = subscribeScheduler; 
     this.observerScheduler = observerScheduler; 
    } 

    @Override 
    public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { 
     CallAdapter<Observable<?>> callAdapter = (CallAdapter<Observable<?>>) rxFactory.get(returnType, annotations, retrofit); 
     return callAdapter != null ? new ThreadCallAdapter(callAdapter) : null; 
    } 

    final class ThreadCallAdapter implements CallAdapter<Observable<?>> { 
     CallAdapter<Observable<?>> delegateAdapter; 

     ThreadCallAdapter(CallAdapter<Observable<?>> delegateAdapter) { 
      this.delegateAdapter = delegateAdapter; 
     } 

     @Override public Type responseType() { 
      return delegateAdapter.responseType(); 
     } 

     @Override 
     public <T> Observable<?> adapt(Call<T> call) { 
      return delegateAdapter.adapt(call).subscribeOn(subscribeScheduler) 
       .observeOn(observerScheduler); 
     } 
    } 
} 

,然後使用它,而不是在你的建設者RxJavaCallAdapterFactory.create() -

Retrofit retrofit = new Retrofit.Builder() 
    .baseUrl("https://api.github.com/") 
    .addConverterFactory(GsonConverterFactory.create()) 
    .addCallAdapterFactory(new RxThreadCallAdapter(Schedulers.io(), AndroidSchedulers.mainThread())) 
    .build(); 
+0

認爲兄弟!你有我! – xuyanjun

+0

這個錯誤如果你在線程內部調用(IE自定義訪問管理)'''return delegateAdapter.responseType();'''會返回空指針。 –

6

您可以使用compose()將其減少到一行。例如,下面是您的getConfig()方法的修改版本。它假定你正在使用retrolambda。

public void getConfig(NetworkSubscriber subscriber) { 
    Observable<Config> observable = mApi.getConfig(); 
    observable 
      .compose(this::setupThreads) 
      .subscribe(subscriber); 
} 

setupThreads()方法是這樣的:

private <T> Observable<T> setupThreads(final Observable<T> observable) { 
    return observable 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

一些參考:

+0

謝謝,我明白了。但是,那不是由rx自己來解決它的方法嗎? – xuyanjun

+0

@xuyanjun - 'compose()'是RxJava的一個重要組成部分,所以我不太清楚你的意思是「由rx self解決它」。在這種情況下,'compose()'將允許您重複使用'setupThreads()'而不必添加'.subscribeOn(Schedulers.newThread())。observeOn(AndroidSchedulers.mainThread())'到每個observable 。 – kjones

6

要對subscribeOn默認調度程序,你可以創建Retrofit例如,當把它作爲一個參數直奔RxJavaCallAdapterFactory

new Retrofit.Builder() 
      .client(okHttpClient) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())) 
      .build(); 

得到了在改造推出2.0.0