2011-05-10 32 views
2

我的問題類似於this的問題,但總是略有不同。使用Rx節流非異步呼叫的回調

我目前正在使用一個簡單模式來異步接收底層數據更改通知的項目。

類Foo負責訂閱這些數據的變化,並提供了一種用於類通過註冊實現給定接口的類的一個實例註冊他們在這些變化的興趣:

public Guid SubscribeToChanges(ISomeCallbackInterface callback) 

類BAR器具這個回調和寄存器本身:

public class Bar : ISomeCallbackInterface 
{ 
    ... 
    //initialise instance of Foo class and subscribe to updates 
    _foo.SubscribeToChanges(this); 
    ... 

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data) 
    { 
     ... 
    } 
} 

理想情況下,我們想扼殺這些回調,我們可以例如獲得從X到Y改變一個特定的數據的值,然後返回到X無線回調瘦一秒的空間。查看Rx文檔,通過使用DistinctUntilChanged操作符,這將是微不足道的。

但是,問題是如何創建一個IObservable集合,然後我可以將運算符應用於上面給出的回調實現。該文檔非常清楚如何從標準.Net事件或Begin .../End ...方法對創建IObservable。

更新:正如Richard Szalay在他的評論中指出的那樣,我需要使用DistinctUntilChanged和Throttle一起來實現我所需要的功能。

再次感謝理查德,我也應該提到我需要能夠退訂回調。在當前模型中,我只是在Foo的實例上調用Unscubscribe(Guid subscriptionToken)。

+0

如果一個值從x變成y到x,那麼'DistinctUntilChanged'會爲所有三個值發出值。 – 2011-05-10 21:13:17

+0

@Richard Szalay - 好點。我忘了提及我需要與Throttle操作符配合使用DistinctUntilChanged。將更新我的問題。 – 2011-05-11 07:05:10

回答

3

要做到這一點的唯一方法我想到的是自定義實現ISomeCallbackInterfaceObservable.Create。不過,它感覺鞋解決方案:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher) 
{ 
    return Observable.CreateWithDisposable<T>(observer => 
    { 
     var subject = new Subject< 
      Tuple<string,IEnumerable<Tuple<string, int, object>>>(); 

     var callback = new ObservableSomeCallback(subject); 

     Guid subscription = publisher.SubscribeToChanges(callback); 

     return new CompositeDisposable(
      subject.Subscribe(observer), 
      Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription)) 
     ); 
    }); 
} 

private class ObservableSomeCallback : ISomeCallbackInterface 
{ 
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer; 

    public ObservableSomeCallback(
     IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer) 
    { 
     this.observer = observer; 
    } 

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data) 
    { 
     this.observer.OnNext(new Tuple< 
      string,IEnumerable<Tuple<string, int, object>>(id, data)); 
    } 
} 
+0

我會剛剛使用一個主題,你越來越喜歡:) – 2011-05-11 00:55:26

+0

非常感謝。我會嘗試這個建議。也感謝提供這個事實,我錯過了我將需要取消訂閱回調的事實 - 我忘了包括這一點! – 2011-05-11 07:03:06