2017-07-29 23 views
0

IM使用BlockingObservable存在難題。我有一個情況,我需要回調完成,並在完成後,我可以做一個翻新API調用來獲取數據。具體而言,我需要首先初始化支付網關sdk,然後在成功完成初始化之後,我將進行翻新調用。以下是我迄今爲止:rxjava2 - 如何阻止直到回撥完成,然後進行翻新呼叫?

Observable.fromCallable(new Callable<PaymentStrategy>() { 

        @Override 
        public PaymentStrategy call() throws Exception { 
         return gatewayFactory.getPaymentStrategy("US"); 
        }}).flatMap(new Function<PaymentStrategy, ObservableSource<PaymentStrategy>>() { 
        @Override 
        public ObservableSource<PaymentStrategy> apply(@NonNull final PaymentStrategy paymentStrategy) throws Exception { 
         return Observable.fromCallable(new Callable<PaymentStrategy>() { 
          @Override 
          public PaymentStrategy call() throws Exception { 


     /*here is important. i want it to block until init actually 
    gets a call back. when it does the subscriber will call 
onComplete and the observable should move forward at that point*/ 


           paymentStrategy.init(paymentInitSubscriber); 
           return paymentStrategy; 
          } 
         }); 
        }}).observeOn(AndroidSchedulers.mainThread()) 
         .subscribeOn(AndroidSchedulers.mainThread()) 
         .subscribe(paymentInitSubscriber); 

似乎rxjava2沒有toBlocking()調用,但我沒有找到一個toBlockingFirst()等,並BlockingObservable類。但我不知道如何完成任務。所以要清楚,當我打電話給paymentStrategy.init()時,我需要observable纔會阻止,直到調用訂閱者onComplete或onNext。我將訂閱者作爲參數傳遞,以便回調知道在完成時調用它。有任何想法嗎 ?

回答

0

我發現我最好使用Flowable並提供一個發射器。然後我可以發射像onNext和的onComplete等

final PaymentStrategy paymentStrategy = gatewayFactory.getPaymentStrategy("US"); 

      FlowableOnSubscribe flowableOnSubscribe = new FlowableOnSubscribe() { 
       @Override 
       public void subscribe(FlowableEmitter e) throws Exception { 
        FlowableEmitter initdownFlowableEmitter = e; 
        paymentStrategy.init(e); 
    /* above i pass in the emitter and i can call onNext when the call back i want completes. this pushes the stream forward to the next one below of the retrofit call */ 
       } 
      }; 

      final Flowable flowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER); 
      return flowable.flatMap(new Function() { 
       @Override 
       public Object apply(@NonNull Object o) throws Exception { 
        PaymentApi service = mRetrofit.create(PaymentApi.class); 
        return service.getCards(); 
       } 
      }).toObservable(); 

事件和改型類不要忘記使用可流動,而不是可觀察到的。

似乎fromAsync已更名爲Flowable.create按照GitHub的筆記here

實際上,1.x的fromEmitter(原fromAsync)已更名爲Flowable.create。