2016-09-03 30 views
6

我看到ReactiveX(RxJava)具有操作timeout,這將適用於每一個項目在訂閱流檢查的第一個響應的項目​​。但我只想用超時檢查第一個響應,而不關心以下響應的超時。我如何用RxJava的操作員優雅地實現這個要求?RxJava只有超時

回答

2

一種方式做到這一點如下:

Observable<Response> respStream = respStream(); 
ConnectableObservable<Response> sharedRespStream = respStream.publish(); 

Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS); 
Observable<String> rest = sharedRespStream.skip(1); 
Observable<String> result = first.mergeWith(rest); 

sharedRespStream.connect(); 

result.subscribe(response -> handleResponse(response), error -> handleError(error)); 

的代碼是自我解釋:份額應避免重複請求,申請超時發出的第一個項目,並與項目下列第一個合併。

3

這是一個更實用的方法。這是Scala中,但應該被轉錄到Java:

val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!")) 

myStream amb myTimeout 

amb操作符返回發出第一可觀察的價值。

0

最好的選擇是使用一個timeout overload,它返回每個項目的超時觀察值,並且還有一個用於訂閱(這是你感興趣的)。

observable.timeout(() -> Observable.empty() 
      .delay(10, TimeUnit.SECONDS), o -> Observable.never()) 

我會解釋,第一func0將在訂閱運行,並會發出空觀察到的(發射完成)由您希望的時間延遲。 如果在任何物品到達之前的時間過去了,就會出現你想要的超時時間。 第二個參數FUNC1將決定項目,你有沒有用之間的超時的,所以我們只是傳遞永不(不完整或做任何事情)

另一種選擇是 以下盧西亞諾的建議,你可以做這樣的:

public static class TimeoutFirst<T> implements Transformer<T,T> { 

    private final long timeout; 
    private final TimeUnit unit; 

    private TimeoutFirst(long timeout, TimeUnit unit) { 
     this.timeout = timeout; 
     this.unit = unit; 
    } 

    @Override 
    public Observable<T> call(Observable<T> observable) { 
     return Observable.amb(observable, 
       Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name())))); 
    } 
} 

public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) { 
    return new TimeoutFirst<>(timeout, seconds); 
} 

這是一個非常漂亮的使用amb的解決方案。