2017-04-05 40 views
4

我有這個代碼來包裝在Rx Java 1中的回調,它編譯得很好,但現在我已經切換到RX Java 2,它不能編譯... Rx Java 2中的等價物是什麼?Rx Java 2:如何包裝回調?

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() { 
      @Override 
      public void call(AsyncEmitter<Integer> emitter) { 

       transObs.setTransferListener(new TransferListener() { 
        @Override 
        public void onStateChanged(int id, TransferState state) { 
         if (state == TransferState.COMPLETED) 
          emitter.onCompleted(); 
        } 

        @Override 
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) { 

        } 

        @Override 
        public void onError(int id, Exception ex) { 
         emitter.onError(ex); 
        } 
       }); 

       emitter.setCancellation(new AsyncEmitter.Cancellable() { 
        @Override 
        public void cancel() throws Exception { 

         transObs.cleanTransferListener(); 
        } 
       }); 
      } 
     }, AsyncEmitter.BackpressureMode.BUFFER); 

UPDATE:

我想出了這一點,但你有沒有因爲它的OnCreate中的呼叫處理背壓?

return Observable.create(new ObservableOnSubscribe<List<DigitsUser>>() { 

     @Override 
     public void subscribe(final ObservableEmitter<List<DigitsUser>> emitter) throws Exception { 

      mDigitFriends.findFriends((gotEm, users) -> { 
       emitter.onNext(users); 
      }); 

      emitter.setCancellable(() -> { 
       emitter.onNext(null); 
      }); 
     } 
    }); 
+0

什麼喲男子,但沒有編制?你需要更具體一些,什麼是錯誤,你有什麼嘗試等等,至於你的問題,它可能需要是RxJava2中的Emitter()的等價物,答案是create() – yosriz

+0

1)You甚至不要在任何地方調用'emitter.onNext',難怪沒有數據。 2)你離開取消部分。 3)如果你需要一個Observable,使用Observable! – akarnokd

+0

我更新了我的潛在解決方案 – Mike6679

回答

6

如果您擔心背壓,您應該使用Flowable類。下面是來自RxJava2 Wiki報價:

實際上,1.x的fromEmitter(原fromAsync)已更名 到Flowable.create。

下面是使用可流動類的例子:

return Flowable.create(new FlowableEmitter<List<DigitsUser>>() { 

     @Override 
     public void subscribe(final FlowableEmitter<List<DigitsUser>> emitter) throws Exception { 

      mDigitFriends.findFriends((gotEm, users) -> { 
       emitter.onNext(users); 
      }); 

      emitter.setCancellable(() -> { 
       emitter.onNext(null); 
      }); 
     } 
    }, BackpressureStrategy.BUFFER);