2016-05-18 64 views
0

我用下面的代碼從這裏 - 看起來像一個問題,我在清理「重播緩存的Rx緩存 - 重放操作清除

https://gist.github.com/leeoades/4115023

如果我改變了下面的調用和代碼像這樣,我發現Replay中存在錯誤,即它永遠不會被清除。有人可以幫助解決這個問題嗎?

private Cache<string> GetCalculator() 
    { 
     var calculation = Observable.Create<string>(o => 
     { 
      _calculationStartedCount++; 

      return Observable.Timer(_calculationDuration, _testScheduler) 
          .Select(_ => "Hello World!" + _calculationStartedCount) // suffixed the string with count to test the behaviour of Replay clearing 
          .Subscribe(o); 
     }); 

     return new Cache<string>(calculation); 
    } 

[Test] 
    public void After_Calling_GetResult_Calling_ClearResult_and_GetResult_should_perform_calculation_again() 
    { 
     // ARRANGE 
     var calculator = GetCalculator(); 

     calculator.GetValue().Subscribe(); 
     _testScheduler.Start(); 

     // ACT 
     calculator.Clear(); 

     string result = null; 
     calculator.GetValue().Subscribe(r => result = r); 
     _testScheduler.Start(); 

     // ASSERT 
     Assert.That(_calculationStartedCount, Is.EqualTo(2)); 
     Assert.That(result, Is.EqualTo("Hello World!2")); // always returns Hello World!1 and not Hello World!2 
     Assert.IsNotNull(result); 
    } 

回答

2

問題是一個微妙的問題。源序列Timer在發出事件後完成,該事件又在由Replay創建的內部ReplaySubject上調用OnCompleted。當Subject完成時,即使出現新的Observable,它也不再接受任何新值。

當你重新訂閱底層Observable再次執行,但無法重新啓動Subject,因此新Observer只能接收ReplaySubject完成之前的最新值。

最簡單的解決方案很可能只是永遠不會讓源流完整的(未經測試):

public Cache(IObservable<T> source) 
    { 
     //Not sure why you are wrapping this in an Observable.create 
     _source = source.Concat(Observable.Never()) 
          .Replay(1, Scheduler.Immediate); 
    }