2017-03-09 38 views
2

我有一個IObservable<T>其中T看起來像的Rx分組節流

public class Notification 
{ 
    public int Id { get; set; } 
    public int Version { get; set; } 
} 

通知以可變的時間間隔以及用於不同的通知,其中,所述版本號獲得與每個每通知ID更新遞增生成。

什麼是合適的方法來限制特定時間段內的觀察值,然後使用最新版本字段接收不同的通知?

到目前爲止,我想出了這個節流和分組,但無法弄清楚如何實際返回IObservable<Notification>

public static IObservable<int> ThrottledById(this IObservable<Notification> observable) 
{ 
    return observable 
     .GroupByUntil(n => n.Id, x => Observable.Timer(TimeSpan.FromSeconds(1))) 
     .Select(group => group.Key); 
} 

編輯: 樣品輸入/輸出(節氣門的延遲:3):

1. { id: 1, v: 1 } 
2. { id: 1, v: 2 } { id: 2, v: 1 } 
3. { id: 1, v: 3 } 
-----------------------------------> notify { id:1, v: 3 }, notify { id:2, v: 1 } 
4. 
5. { id: 2, v: 2 } 
6. 
-----------------------------------> notify { id:2, v: 2 } 
7. { id: 1, v: 4 } 
8. { id: 1, v: 5 } { id: 2, v: 3 } 
9. { id: 1, v: 6 } 
-----------------------------------> notify { id:1, v: 6 }, notify { id: 2, v: 3 } 
... 
... 
+0

你可以添加與所需的輸出一些樣品的輸入? – Shlomo

回答

0

這似乎產生所需輸出

IObservable<Notification> GroupByIdThrottle(IObservable<Notification> producer, IScheduler scheduler) 
    { 
     return Observable.Create<Notification>(observer => 
     { 
      return producer 
       .GroupByUntil(
        item => item.Id, 
        item => Observable.Timer(TimeSpan.FromSeconds(3), scheduler)) 
       .SelectMany(result => 
       { 
        return result.Aggregate<Notification, Notification>(null, (dict, item) => 
        { 
         return item; 
        }); 
       }) 
       .Subscribe(observer); 

     }); 
    } 

的想法是,總使得它所以只有每個組的最後一個值才能使其活躍,並且一旦定時器擊中3秒,分組流就完成了。

我拆散這個

https://social.msdn.microsoft.com/Forums/en-US/7ebf68e8-99af-44e2-b80d-0292cb5546bc/group-by-with-buffer-reactive-extensions?forum=rx

爲理念

+1

適合我。聚合良好的提示。有一點點變化,SelectMany(g => g.ToArray()),然後選擇最後一個,但它通過我的單元測試到目前爲止。 – Stefan

2

這種做法完全符合您的期望:如果你想收集的所有通知與n個相同的ID

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable) 
{ 
    return observable.Buffer(TimeSpan.FromSeconds(3)) 
     .SelectMany(x => 
      x.GroupBy(y => y.Id) 
      .Select(y => y.Last())); 
} 

幾秒鐘後,他們的第一個揭露最後一個,然後接近,基於GroupByUntil,是你所需要的。

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable) 
{ 
    return observable.GroupByUntil(x => x.Id, x => Observable.Timer(TimeSpan.FromSeconds(3))) 
     .SelectMany(x => x.LastAsync()); 
} 

你的樣品輸入/輸出會看起來如此:

1. { id: 1, v: 1 } 
2. { id: 1, v: 2 } { id: 2, v: 1 } 
3. { id: 1, v: 3 } 
-----------------------------------> notify { id:1, v: 3 } 
4. 
-----------------------------------> notify { id:2, v: 1 } 
5. { id: 2, v: 2 } 
6. 
7. { id: 1, v: 4 } 
-----------------------------------> notify { id:2, v: 2 } 
8. { id: 1, v: 5 } { id: 2, v: 3 } 
9. { id: 1, v: 6 } 
-----------------------------------> notify { id:1, v: 6 } 
10. 
-----------------------------------> notify { id: 2, v: 3 } 
... 
...