2016-11-28 36 views
0

我的問題與Rx和Catch運算符有關。比方說,我有我的可觀察的超時和每次超時發生我想重新創建底層觀察(Catch)並做同樣的事情(添加超時和捕獲)。無限延伸中的無限捕獲

下面我粘貼了示例代碼。爲了這個例子的目的,Timeout每2秒發生一次。從我的觀察來看,這段代碼無法無限工作,不知何故,在娛樂之後,某些事物正在引用舊的可觀察的剩菜。當Catch被調用時,那些剩餘物會累積。

大多數可疑行是最後一行,是某種自引用存在。但我實際上無法想象自己爲什麼它可能是錯的?還有什麼辦法可以用類似的邏輯來創建可以永久工作的可觀察事物嗎?

public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return recreateObservable() 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable)); 
    } 
+1

這是一個.NET的問題或Java的問題嗎?我很困惑的rx-java標籤... –

+0

這是更接近Rx的問題,所以rx-net和rx-java都很好。只有代碼示例是在C#中 –

回答

1

我想你只是想用Retry()操作符。

我假設你的初始序列和你的繼續序列是一樣的。

例如

Observable.Return(1).Concat(Observable.Throw<int>(new Exception())) 
    .Retry() 

這將運行在一個緊密的無限循環中。

您的代碼可能最後看起來像個

createObservable() 
    .Timeout(TimeSpan.FromSeconds(2)) 
    .Retry() 
0

你可以做這樣的事情。

//done in Linqpad, where async Main is allowed. 
async void Main() 
{ 
    var source = new Subject<string>(); 
    var backup = new Subject<string>(); 
    var reliableStream = source.CreateReliableStream(() => backup); 
    reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"),() => Console.WriteLine("Completed.")); 

    source.OnNext("sourceAbc"); 
    backup.OnNext("backupAbc"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    source.OnNext("sourceDef"); 
    backup.OnNext("backupDef"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    //Doesn't yield "Completed" because it's re-subscribing. 
    source.OnCompleted(); 
    backup.OnCompleted(); 

} 

public static class Ex 
{ 
    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f) 
    { 
     while(true) 
      yield return f(); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return InfiniteObservables(recreateObservable) 
      .Select(o => o.Timeout(TimeSpan.FromSeconds(2))) 
      .OnErrorResumeNext(); 
    } 
} 

產生以下的輸出:

Next: sourceAbc 
Next: sourceDef 
Next: backupHij 
Next: backupLmn 

我不是這種方法的粉絲雖然。 Rx將錯誤視爲流終止符,並且您正在嘗試將它們視爲替代消息。你最終會像這樣往上游去。