2012-06-25 36 views
2

我有一個webservice,我想在布爾值設置爲true後立即查詢更新,然後每5分鐘重新檢查一次。無效擴展,立即觸發OnNext,然後在一段時間內觸發

這是我觀察到的電流:

 _queryDisposable = Observable 
      .Interval(TimeSpan.FromMinutes(5)) 
      .ObserveOn(Scheduler.ThreadPool) 
      .Where(i => IsProcessing)  // IsProcessing is the bool value 
      .Subscribe(GetFeeds, OnError, OnComplete); 

這可觀察到的將檢查IsProcessing布爾是真的還是假的,每5分鐘,然後調用GetFeeds如果這是真的。我能想到的,以獲得預期的效果

唯一的辦法就是讓IsProcessing屬性支持的領域和過程與觀測2類似如下:

private bool _isProcessing; 

    public bool IsProcessing 
    { 
     get { return _isProcessing; } 
     set 
     { 
      if (_isProcessing == value) 
       return; 

      _isProcessing = value; 

      if (!value) 
      { 
       if(_queryDisposable != null) 
        _queryDisposable.Dispose(); 
       _queryDisposable = null; 
      } 
      else 
      { 
       Observable 
        .Range(0,1) 
        .ObserveOn(Scheduler.ThreadPool) 
        .Subscribe(GetFeedsSafe, OnError, OnComplete); 

       _queryDisposable = Observable 
        .Interval(TimeSpan.FromMinutes(5)) 
        .ObserveOn(Scheduler.ThreadPool) 
        .Subscribe(GetFeedsSafe, OnError, OnComplete); 
      } 
     } 
    } 

我不認爲這是一個非常優雅解決方案,所以我想知道是否有更好的方法來達到這種效果?

回答

3

我是否錯過了一些東西,或者不打電話直接在啓動Observable之前打電話給自己?

Subscribe(GetFeeds, OnError, OnComplete); 
    // Or perhaps, if you want it to be async, 
    new TaskFactory().StartNew(()=>Subscribe(GetFeeds, OnError, OnComplete)); 

    _queryDisposable = Observable 
     .Interval(TimeSpan.FromMinutes(5)) 
     .ObserveOn(Scheduler.ThreadPool) 
     .Where(i => IsProcessing)  // IsProcessing is the bool value 
     .Subscribe(GetFeeds, OnError, OnComplete); 

編輯:純淨的Rx,從http://social.msdn.microsoft.com/Forums/sa/rx/thread/c4acaf34-3136-4206-a6f9-ef5afba74b2b

Observable.Interval(TimeSpan.FromMinutes(5)) 
     .StartWith(-1L) 
     .ObserveOn(Scheduler.ThreadPool) 
     .Where(i => IsProcessing)  // IsProcessing is the bool value 
     .Subscribe(GetFeeds, OnError, OnComplete); 
+0

大聲笑,有效的點!我試圖看看是否有一種純粹的Rx方式來執行諸如「Fire,然後在TimeSpan間隔上觸發」之類的操作。 –

+0

@joe_coolish找到了一個示例,請參閱編輯。 –

+0

標準做法是在訂閱之前將'ObserveOn'作爲最後一個操作符,除非其中一個後面的操作符必須在特定的調度程序上運行。 –

0

你有沒有想過使得IsProcessing可變火的道具改變事件,然後可以聽?

var processing = Observable 
    .FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged") 
    .Where(tr => tr.EventArgs.Property == "IsProcessing" && ((Type)tr.Sender).IsProcessing); 

_queryDisposable = Observable 
    .Interval(TimeSpan.FromMinutes(5)) 
    .ObserveOn(Scheduler.ThreadPool) 
    .And(processing) // Will only fire when both sequences have an available value. 
    .Subscribe(GetFeeds, OnError, OnComplete); 

這也保證,如果是處理,它只會作爲觀察到的永遠只能提供一次一個單一值(每IsProcessing改爲true)的處理火一次,所以你不必擔心如果它在5分鐘後仍在處理,它會再次發射。

2

要開始,你立即仍然測序產生在給定的時間價值,你有兩個選擇:

  1. Observable.Interval(TimeSpan.FromMinutes(5))StartWith(-1)
  2. 可觀測.Timer(TimeSpan.FromMinutes(0),TimeSpan.FromMinutes(5))

接下來我假設你要取消計時器,當IsProcessing值設置爲false,你也想重新開始時的順序它被設置回true。無論何時設置IsProcessing值,上述兩個答案都會運行5分鐘的時間間隔。如果設置IsProcessing = true,則序列將開始,如果一分鐘後您設置IsProcessing = false,然後IsProcessing = true,則在再次查詢之前,您仍然需要等待4分鐘。我認爲這不是你想要的?

所以我們希望這是從屬性更改引發的。 @SPFiredrake在建議INPC和將事件轉化爲可觀測序列方面做得很好。有很多小片段可以幫助你做到這一點。假設我們使用擴展方法PropertyChanges,我們可以編寫這個非常簡單的查詢。

var isProcessingValues = this.PropertyChanges(x=>x.IsProcessing); 
isProcessingValues 
    .Where(isProcessing=>isProcessing) 
    .SelectMany(_=> 
     Observable.Timer(TimeSpan.Zero, TimeSpan.FromMinutes(5)) 
        .TakeUntil(isProcessingValues.Where(isProcessing=>!isProcessing)) 
    ) 
    .Select(_=>GetFeeds()) //What does GetFeeds actually return? 
    .SubscribeOn(Scheduler.NewThread) 
    .Subscribe(feedData=>Console.WriteLine(feedData), 
     ex=>Console.WriteLine (ex), 
     ()=>Console.WriteLine (completed)); 

通過這個慢慢步進:

  1. 當IsProcessing屬性更改,推新值序列(isProcessingValues)
  2. 當一個序列值是true,開始產生一個計時器序列現在和每5分鐘的價值。
  3. 如果isProcessingValues序列產生的每一個計時器刻度時間(當IsProcess被設置爲真和每隔5分鐘,同時它是真實的)假
  4. 一個值的計時器序列應停止,使對GetFeeds呼叫
  5. 確保我們的序列在不同的線程上訂閱(不阻止)。如果這不是要求,請刪除。
  6. 從Feed中獲取數據並對其執行操作。