2017-04-01 43 views
0

我試圖通過亞馬遜的S3 Android SDK上傳文件。我已經使用了RX Java,但我不確定如何將此方法轉換爲返回Observable的方法,因爲我想將此方法的結果鏈接到另一個Observable調用。它讓我感到困惑,因爲它不會立即返回,並且在OnError或OnState更改之前無法返回。我如何以RX方式處理這些情況?Rx Java Android:如何將此回調塊轉換爲Observer

public void uploadFile(TransferObserver transferObserver){ 

    transferObserver.setTransferListener(new TransferListener() { 
     @Override 
     public void onStateChanged(int id, TransferState state) { 

     } 

     @Override 
     public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) { 

     } 

     @Override 
     public void onError(int id, Exception ex) { 

     } 
    }); 

} 

如果有人能與RX的Java 2和lambda表達式回答將是巨大的,因爲我剛上來短在這一個

+0

相關:http://stackoverflow.com/a/41870888/697313 –

回答

0

@Yosriz我不能讓你的代碼編譯,但你沒有幫助我這麼基於你的答案在這裏一點是我現在有:

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() { 
      @Override 
      public void call(AsyncEmitter<Integer> emitter) { 

       transObs.setTransferListener(new TransferListener() { 
        @Override 
        public void onStateChanged(int id, TransferState state) { 
         if (state == TransferState.COMPLETED) 
          emitter.onCompleted(); 
        } 

        @Override 
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) { 

        } 

        @Override 
        public void onError(int id, Exception ex) { 
         emitter.onError(ex); 
        } 
       }); 

       emitter.setCancellation(new AsyncEmitter.Cancellable() { 
        @Override 
        public void cancel() throws Exception { 

         transObs.cleanTransferListener(); 
        } 
       }); 
      } 
     }, AsyncEmitter.BackpressureMode.DROP); 
+1

您需要在dispose()方法中釋放transObs的偵聽器。我想用Completable更好,因爲你不關心onNext()。 –

1

這通常是異步/回調workd無功之間的橋樑正確的方法,但使用Observable.create()現在氣餒,因爲它需要先進的知識在訂單t o說得對。
您應該使用較新的創造方法Observable.fromEmitter(),這看起來不太一樣:

return Observable.fromEmitter(new Action1<Emitter<Integer>>() { 
     @Override 
     public void call(Emitter<Integer> emitter) { 

      transObs.setTransferListener(new TransferListener() { 
       @Override 
       public void onStateChanged(int id, TransferState state) { 
        if (state == TransferState.COMPLETED) 
         emitter.onCompleted(); 
       } 

       @Override 
       public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) { 

       } 

       @Override 
       public void onError(int id, Exception ex) { 
        emitter.onError(ex); 
       } 
      }); 
      emitter.setCancellation(new Cancellable() { 
       @Override 
       public void cancel() throws Exception { 
        // Deal with unsubscription: 
        // 1. unregister the listener to avoid memory leak 
        // 2. cancel the upload 
       } 
      }); 
     } 
    }, Emitter.BackpressureMode.DROP); 

什麼在這裏加入是:處理unsusbcription:取消上傳,並註銷,以避免內存泄漏,並指定背壓策略。
你可以閱讀更多here

其他注意事項:

  • ,如果你有興趣與進步,你可以在onProgressChanged()調用onNext()的進步和轉換可觀測到Observable<Integer>
  • 如果不是,你可能要考慮使用Completable這是可觀測的沒有onNext()排放量,但只有onCompleted()這可以適合你的情況,如果你不感興趣的進展跡象。