2017-07-10 14 views

回答

2

你可以建立你自己的循環,基於onErrorHandleWith

def retryLimited[A](fa: Observable[A], maxRetries: Int) 
    (p: Throwable => Boolean): Observable[A] = { 

    // If we have no retries left, return the source untouched 
    if (maxRetries <= 0) fa else 
    fa.onErrorHandleWith { err => 
     // If predicate holds, do recursive call 
     if (p(err)) 
     retryLimited(fa, maxRetries - 1)(p) 
     else 
     Observable.raiseError(err) 
    } 
} 

如果你不喜歡簡單的功能(我),你總是可以公開一些擴展方法作爲一種替代方案:

implicit class ObservableExtensions[A](val self: Observable[A]) 
    extends AnyVal { 

    def onErrorRetryLimited(maxRetries: Int) 
    (p: Throwable => Boolean): Observable[A] = 
    retryLimited(self, maxRetries)(p) 
} 

說明@JVS答案是在精神正常,但可是有問題的,因爲它保持共享可變狀態,這對於冷觀察者來說是不好的。所以請注意,如果你做這樣的事情會發生什麼:

val source = Observable.suspend { 
    if (Random.nextInt() % 10 != 0) 
    Observable.raiseError(new RuntimeException("dummy")) 
    else 
    Observable(1, 2, 3) 
} 

val listT = source 
    .onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart)) 
    .toListL 

listT.runAsync // OK 
listT.runAsync // Ooops, shared state, we might not have retries left 

在Observable的運營商中要小心可變的共享狀態。你當然可以這樣工作,但你必須意識到:-)

+0

感謝您指出這個潛在的問題! – JVS

0

警告:這使用共享可變狀態,對於冷觀察對象可能不正確。見亞歷山德魯的回答。

定義一個函數來做到這一點:

def limitedRetries(maxRetries: AtomicInt, shouldRetryOnException: Throwable => Boolean): Throwable => Boolean = 
    ex => maxRetries.decrementAndGet() > 0 && shouldRetryOnException(ex) 

而且在onErrorRestartIf

.onErrorRestartIf(limitedRetries(AtomicInt(maxRetries), shouldRestart)) 

僅供參考使用這個功能,使用monix AtomicInt這裏...

import monix.execution.atomic.AtomicInt