2017-01-27 69 views
4

已經有使用無(Database polling with Reactive Extensions如何使用Reactive Extensions進行狀態輪詢?

我有一個類似的問題在數據庫查詢一個很好的問題,但有一個轉折:我需要從以前的結果,養活一個值到下一個請求。基本上,我想查詢此:

interface ResultSet<T> 
{ 
    int? CurrentAsOfHandle {get;} 
    IList<T> Results {get;} 
} 

Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle); 

的想法是,這將返回自上次請求


  1. 的每一分鐘,我想打電話給GetNewResultsAsync
  2. 我所有新項目想要將先前呼叫的CurrentAsOf作爲參數傳遞給previousRequest參數
  3. 下一次呼叫GetNewResultsAsync應該上一個

基本上後實際發生一分鐘後,有沒有比一個更好的辦法:

return Observable.Create<IMessage>(async (observer, cancellationToken) => 
{ 
    int? currentVersion = null; 

    while (!cancellationToken.IsCancellationRequested) 
    { 
     MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion); 

     currentVersion = messageResultSet.CurrentVersionHandle; 

     foreach (IMessage resultMessage in messageResultSet.Messages) 
      observer.OnNext(resultMessage); 

     await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); 
    } 
}); 

另外請注意,這個版本允許messageResultSet,同時等待收集下一次迭代(例如,我想也許我可以用Scan以前的結果集對象傳遞到下一個迭代)

回答

1

你的問題基本上熱度CES這樣的:有一個Scan函數簽名:

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 

但是,你需要像

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 

...其中累加器函數返回一個可觀察的,和掃描功能自動將其簡化爲傳入下一個呼叫。

這裏是一個窮人版的功能實現的Scan

public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 
{ 
    return source 
     .Publish(_source => _source 
      .Take(1) 
      .Select(s => accumulator(initialValue, s)) 
      .SelectMany(m => _source.MyScan(m, accumulator).StartWith(m)) 
     ); 
} 

鑑於此,我們可以改變這一點納入還原功能:

public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 
{ 
    return source 
     .Publish(_source => _source 
      .Take(1) 
      .Select(s => accumulator(initialValue, s)) 
      .SelectMany(async o => (await o.LastOrDefaultAsync()) 
       .Let(m => _source 
        .MyObservableScan(m, accumulator) 
        .StartWith(m) 
       ) 
      ) 
      .Merge() 
     ); 
} 

//Wrapper to accommodate easy Task -> Observable transformations 
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator) 
{ 
    return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s))); 
} 

//Function to prevent re-evaluation in functional scenarios 
public static U Let<T, U>(this T t, Func<T, U> selector) 
{ 
    return selector(t); 
} 

現在,我們有這個奇特MyObservableScan運營商,我們可以相對輕鬆地解決您的問題:

var o = Observable.Interval(TimeSpan.FromMinutes(1)) 
    .MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle)) 

請注意,在測試中,我注意到如果累加器函數在源中的時間間隔比間隔更長,那麼observable將終止。我不知道爲什麼。如果有人能夠改正,非常有必要。

0

我自從發現有一個重載Observable.Generate幾乎沒有訣竅。主要缺點是它不適用於async

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler);

null傳遞我的初始狀態。作爲我的條件傳遞x => true(調查不休)。在iterate裏面,我根據傳入的狀態進行實際輪詢。然後在timeSelector中返回輪詢間隔。

所以:

var resultSets = Observable.Generate<ResultSet<IMessage>, IEnumerable<IMessage>>(
    //initial (empty) result 
    new ResultSet<IMessage>(), 

    //poll endlessly (until unsubscription) 
    rs => true, 

    //each polling iteration 
    rs => 
    { 
     //get the version from the previous result (which could be that initial result) 
     int? previousVersion = rs.CurrentVersionHandle; 

     //here's the problem, though: it won't work with async methods :(
     MessageResultSet messageResultSet = ReadLatestMessagesAsync(currentVersion).Result; 

     return messageResultSet; 
    }, 

    //we only care about spitting out the messages in a result set 
    rs => rs.Messages, 

    //polling interval 
    TimeSpan.FromMinutes(1), 

    //which scheduler to run each iteration 
    TaskPoolScheduler.Default); 

return resultSets 
    //project the list of messages into a sequence 
    .SelectMany(messageList => messageList); 
+0

另一個小缺點是整個結果集住在通過下一次迭代。問題中的版本允許'消息'部分被垃圾收集,因爲我們只需要'版本'。 –

相關問題