2016-03-02 47 views
6

我正在嘗試使用RxJS進行簡單的短期調查。它需要每delay秒向服務器上的位置path發出一次請求,一旦達到以下兩個條件之一就結束:回調isComplete(data)返回true或已嘗試服務器超過maxTries。這裏是基本的代碼:RxJS 5.0「do while」like mechanism

newShortPoll(path, maxTries, delay, isComplete) { 
    return Observable.interval(delay) 
    .take(maxTries) 
    .flatMap((tryNumber) => http.get(path)) 
    .doWhile((data) => !isComplete(data)); 
    } 

然而,doWhile不RxJS 5.0存在,因此條件的地方只能儘量服務器maxTries作品,由於採取()調用,但isComplete條件不工作。我怎樣才能做到這一點,所以觀察值將next()值,直到isComplete返回true,此時它將next()該值和complete()。

我應該注意,takeWhile()不適合我這裏工作。它不會返回最後一個值,這實際上是最重要的,因爲那是當我們知道它已完成時。

謝謝!

+0

可能的重複:http://stackoverflow.com/questions/36007911/rxjs-poll-until-interval-done-or-correct-data-received –

+0

這不是一個重複的問題是要求替換'doWhile'。 –

回答

0

我們可以創建一個實用函數來創建第二個Observable,它發出內部Observable發出的每個項目;但是,我們將調用onCompleted功能一旦我們的條件是:

function takeUntilInclusive(inner$, predicate) { 
    return Rx.Observable.create(observer => { 
     var subscription = inner$.subscribe(item => { 
      observer.onNext(item); 

      if (predicate(item)) { 
       observer.onCompleted(); 
      } 
     }, observer.onError, observer.onCompleted); 


     return() => { 
      subscription.dispose(); 
     } 
    }); 
} 

下面是使用我們的新的實用方法,快速的片段:

const inner$ = Rx.Observable.range(0, 4); 
const data$ = takeUntilInclusive(inner$, (x) => x > 2); 
data$.subscribe(x => console.log(x)); 

// >> 0 
// >> 1 
// >> 2 
// >> 3 

這個答案是基於關閉:RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

+0

奇怪的是,這種解決方案似乎工作了大約一半的時間。有時候,最後一批數據會將其傳遞給訂閱者,有時候並不會。任何想法爲什麼?切換到switchMap也沒有修復它。 –

0

您可以使用retryfirst運算符來實現此目的。

// helper observable that can return incomplete/complete data or fail. 
 
var server = Rx.Observable.create(function (observer) { 
 
    var x = Math.random(); 
 

 
    if(x < 0.1) { 
 
    observer.next(true); 
 
    } else if (x < 0.5) { 
 
    observer.error("error"); 
 
    } else { 
 
    observer.next(false); 
 
    } 
 
    observer.complete(); 
 

 
    return function() { 
 
    }; 
 
}); 
 
    
 
function isComplete(data) { 
 
    return data; 
 
} 
 
    
 
var delay = 1000; 
 
Rx.Observable.interval(delay) 
 
    .switchMap(() => { 
 
    return server 
 
     .do((data) => { 
 
     console.log('Server returned ' + data); 
 
     },() => { 
 
     console.log('Server threw'); 
 
     }) 
 
     .retry(3); 
 
    }) 
 
    .first((data) => isComplete(data)) 
 
    .subscribe(() => { 
 
    console.log('Got completed value'); 
 
    },() => { 
 
    console.log('Got error'); 
 
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>