2016-11-14 41 views
1

我在Android應用與RxJava結合不同的業務之後發送的onError(),我想,要麼流成功完成,並提交onNext()項目或10秒後,一個onError將被拋出。RxJava 10secs

我已經試過這樣的事情與timeout

Observable.from(list) 
      .doOnNext(new Action1<List<String>>() { 
       @Override 
       public void call(List<String> list) { 
        //do something here 
       } 
      }) 
      .filter(new Func1<List<String>>, Boolean>() { 
       @Override 
       public Boolean call(List<String> list) { 
        return list != null; 
       } 
      }) 
      .flatMap(new Func1<List<String>>, Observable<MyResponse>>() { 
       @Override 
       public Observable<MyResponse> call(List<String> list) { 
        //flatmap something here 
        return Observable.just(new MyResponse(list)); 
       } 

      }) 
      .flatMap(new Func1<MyResponse, Observable<AnotherResponse>>() { 
       @Override 
       public Observable<AnotherResponse> call(MyResponse myResponse) { 
        //do something here 
        return Observable.just(new AnotherResponse(myResponse)); 
       } 
      }) 
      .timeout(10, TimeUnit.SECONDS) 
      .subscribe(new Subscriber<AnotherResponse>()) { 
       //do Subscription stuff here 
      }); 

但是,這將在任何情況下拋出一個超時,我只是想跳到onError如果上面列出的流量並沒有在10秒成功完成。任何建議如何我可以實現這一目標?

+0

基本上'timeout'會在您的時間限制內沒有發射任何物品時引發錯誤狀態。基本上我認爲你的方法可行。你的鏈條是否完整?如果它沒有完成,那麼這可能是原因,爲什麼執行超時。 – Christopher

+0

@Christopher你是對的,我的連鎖店沒有完成 – Eve

回答

3

克里斯托弗在他的評論中說,你所得到的是錯誤的原因是,timeout將拋出一個TimeoutException每當非成品觀察,(onCompleted當時尚未被稱作)未能出示設定的超時內的下一個onNext

由於我不知道你的源可觀察 - 或者更確切地說,觀測量的flatMap的內線 - 在做我會首先檢查它是否真的應該產生一個onCompleted(大概過至少一個onNext),還是保持開放「通過設計」(可能是源代碼是開放式流,就像您設備的網絡狀態)。如果源代碼本身是通過設計開放的,您可以在第一個onNext之後人爲地引入onCompleted,只需將take(1)添加到您的鏈中即可。

+0

這很好,非常感謝你! :) – Eve

1

也許這可以幫助你。

private Observable<String> myMethod(final List<String> list) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override public void call(final Subscriber<? super String> subscriber) { 

      // Your long lasting operation 
      Observable<String> listObservable = Observable.from(list); 

      // This is just my way to make slow operation (I have 10 items in my list) 
      Observable<String> delayedObservable = listObservable.zipWith(Observable.interval(2, TimeUnit.SECONDS), new Func2<String, 
        Long, 
        String>() { 
       @Override public String call(String s, Long aLong) { 
        return s; 
       } 
      }); 
      delayedObservable.subscribe(subscriber); 

      Runnable r = new Runnable() { 
       @Override public void run() { 
        // This thread makes the timeout, it is up to you if you would keep this like this. 
        try { 
         Thread.sleep(10000); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
        subscriber.onError(new TimeoutException()); 
        subscriber.onCompleted(); 
       } 
      }; 
      Thread thread = new Thread(r); 
      thread.start(); 
     } 
    }); 
} 

這對我很有用,在一個非常基本的情況下,我希望它能幫助你。