訂閱

2012-06-26 146 views
0

的處置時訪問當前的窗口,我有以下代碼:訂閱

var observable = ... subscribe to event here ... 

var windows = observable.Window(TimeSpan.FromSeconds(240)); 

aggregatedWindows = windows.SelectMany(
    window => window.Aggregate(new Context(), AggregateContext)); 

subscription = aggregatedWindows.Subscribe(OnWindow); 

... later 

subscription.Dispose(); 

試想一個場景,當我在處理窗口,有人已要求我的應用程序應該關閉的中間。我要處置這個訂閱,這將停止正在處理的事件,但是我也將失去最後一個窗口的信息。

我不知道什麼是最好的方式來處理,這是...

,因爲它是通過聚合函數傳遞我可以存儲本地狀態與上看到窗口(但這似乎是錯誤的)。 ..

任何幫助將不勝感激!

回答

0

您可以對窗口進行操作,而不是保留聚合訂閱 - 這是您最初希望保持連接到最後一個窗口的時間,並在超時時使用超時斷開連接劃分。

這裏使用了一個單獨的類,因爲使用Create使它成爲自動分離 - 在處理調用完成後立即斷開觀察者。因此,從根本上來說,Dispose的含義就是這裏所做的改變。

public static IObservable<T> DeferDisconnection<T>(this IObservable<T> observable, TimeSpan timeout) 
    { 
     return new ClosingObservable<T>(observable, timeout); 
    } 


    public class ClosingObservable<T> : IObservable<T> 
    { 

     private readonly IConnectableObservable<T> Source; 
     private readonly IDisposable Subscription; 
     private readonly TimeSpan Timeout; 

     public ClosingObservable(IObservable<T> observable, TimeSpan timeout) 
     { 
      Timeout = timeout; 
      Source = observable.Publish(); 
      Subscription = Source.Connect(); 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      Source.Subscribe(observer); 

      return Disposable.Create(() => Source.Select(_ => new Unit()) 
               .Amb(Observable.Timer(Timeout).Select(_ => new Unit())) 
               .Subscribe(_ => Subscription.Dispose()) 
               ); 
     } 
    } 

測試:

  var disposable = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .Do(Console.WriteLine) 
         .DeferDisconnection(TimeSpan.FromSeconds(5)) 
         .Subscribe(); 

      Console.ReadLine(); 

      disposable.Dispose(); 

      Console.ReadLine(); 
+0

我很困惑如何解決這個問題。看起來DeferDisconnection在處理之前會稍微等一下,但是如果窗口很大,這會導致應用等待很長時間才能關閉。那是對的嗎? – jonnii

+0

@jonnii這就是爲什麼有一個超時參數 - 它在下一個值到達時斷開,或者它需要太長時間,這是超時參數。 – Asti

-1

這工作,進行部分窗口證實被顯示在後面。

class Program 
{ 
    public class Context 
    { 
     public int count; 
    } 

    static Context AggregateContext(Context c, long i) 
    { 
     c.count++; 
     return c; 
    } 

    static void OnWindow(Context c) { Console.WriteLine(c.count); } 

    static void Main(string[] args) 
    { 
     var canceled = new Subject<bool>(); 

     var observable = Observable.Interval(TimeSpan.FromSeconds(.1)).TakeUntil(canceled); 

     var windows = observable.Window(TimeSpan.FromSeconds(3)); 

     var aggregatedWindows = windows.SelectMany(
      window => window.Aggregate(new Context(), AggregateContext)); 

     var subscription = aggregatedWindows.Subscribe(OnWindow); 

     Thread.Sleep(TimeSpan.FromSeconds(10)); 

     canceled.OnNext(true); 
     subscription.Dispose(); 

     Console.WriteLine(@"Output should have been something like 30,30,30,30,10"); 
     Console.ReadLine(); 
    } 
} 
+0

爲什麼這會降低投票率? – jonnii