2017-10-10 79 views
0

我想學習RxJava2,並將我的AsyncTasks轉換爲Observable。Asynctask status &&取消等效於RxJava2 Observable?

我有下面這段代碼,我試圖轉換。

if(asyncTask.getStatus() == AsyncTask.Status.RUNNING){ 
    asyncTask.cancel(); 
} 
asyncTask = new CustomTask(); 
asyncTask.execute(input); 

我試圖重新創建以下與一次性使用。

Disposable currentTask; 
PublishSubject input = PublishSubject.create(); 

對於每一個輸入

if(currentTask != null) currentTask.dispose(); 

currentTask = input 
    .map(// Network calls 
     // returns CustomObject) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(result -> { 
       // do work with result 
       }, throwable -> Log.e(TAG, throwable.toString())); 

然而,currentTask總是。爲什麼?這是做錯了嗎?

+0

並非所有'subscribe(...)'方法都返回'Disposable'。你能否至少暗示你傳遞給'subscribe(...)'的參數? – Jon

+0

你是否試圖在任何時候清空'currentTask'?另外,你可以添加任何其他你使用'PublishSubject輸入'的地方嗎?我懷疑你可能會濫用這個主題,這是造成你的問題。 – Jon

回答

0

您正確使用Disposable,但我只能假設你正在搞這個問題。 rx中的主題既可以是發佈者也可以是訂閱者......主題不一定要等到subscribe(...)開始發佈項目。因此,我不建議用任何形式的Subject來替換您的AsyncTask

你可以得到類似的,更具有確定性的行爲,這樣做雖然:

Observable<CustomObject> networkObservable = 
      Observable.create(emitter -> 
        { 
         try { 
          CustomObject object = doNetworking(); 
          emitter.onNext(object); 
          emitter.onComplete(); 
         } catch (Exception e) { 
          emitter.onError(e); 
         } 
        } 
      ); 

if(currentTask != null) currentTask.dispose(); 

currentTask = networkObservable.subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     // this next subscribe is similar to AsyncTask.execute() as it starts the stream 
     .subscribe(result -> { 
      // do work with result 
      }, throwable -> Log.e(TAG, throwable.toString())); 

另外,還要考慮尋找到SerialDisposable,你不必做那些空/處置檢查

SerialDisposable serialDisposable = new SerialDisposable(); 

原子化:設置該容器上的下一個一次性物品,並處理前一個(如果有的話),或者如果容器已被處置。

Disposable disposable = networkObservable.subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(...); 

serialDisposable.set(disposable); // this will call dispose on the previous task if it exists