2016-10-31 33 views
1

我需要如何分發聚集更新一個好主意......彙總觀察到的數據分成多個桶

可以說,我有一個ID的的IObservable發送和接收消息的一個永無止境的流值(5- 10,000 /秒)。現在我想計算大量的聚合(例如總和) 以便定期分配給其他系統 - 讓每個聚合每10秒鐘說一次。 聚合基於元組的Id(字符串),但可能會落入多個聚合(聚合定義應包含哪些id - 因此會重疊)。

會有幾千個聚合定義,所以有人有任何想法如何解決這個問題?

概念:

public struct Update 
{ 
    public string Id { get; } 

    public int Value { get; } 
} 

public class Aggregate 
{ 
    Dictionary<string, Update> latestValues = new Dictionary<string, Update>(); 

    public void AddUpdate(Update update) 
    { 
     latestValues[update.Id] = update; 
    } 

    public int CalculateSum() 
    { 
     return latestValues.Values.Select(v => v.Value).Sum(); 
    } 
} 

UPDATE:

這個問題的目的是爲了簡化真正的問題 - 也許我沒有做一個好工作 - 對不起那個。 假設我有多個產生溫度的IOT設備並定期報告此溫度(更新流)。然後,不同的用戶可以選擇查看設備子集的聚合值(例如,平均值)。因此,一個客戶可能希望看到設備1,2和3的平均值,而另一個客戶可能希望看到設備2,3和4的平均值等等(聚合定義)。

+0

聚合組密鑰總是等於「Id」,並且一條消息只能屬於一個聚合組? – supertopi

+1

我們可以得到一個mcve嗎? http://stackoverflow.com/help/mcve –

+0

你的意思是'10.000'在一萬或十個小數點後面有三個零嗎? – Enigmativity

回答

2

我想你在問什麼您如何使用Rx創建實時閱讀模型*。

鑑於我能猜到你的問題,我想你想能夠更新一些當前狀態,每個更新消息。在你的CalculateSum方法的情況下,你不能只彙總所有消息的Value屬性,因爲有些將被用來更新/覆蓋現有值。

因此,考慮到這個假設,它看起來像GroupBy將是你的朋友。如果您首先將可觀察的值序列分割爲子序列,則可以分割並征服問題。

input.GroupBy(i=>i.Id) 

如果我們只考慮一個屬於同一個Id的值流,那麼每個值的總和應該是多少?

-1--1--2- 

在這種簡單的情況下,答案總是隻是直接傳遞的值。即

input -1--1--2- 
result -1--1--2- 

然而,當我們看兩個序列生產值之成爲稍硬計算

inputA -1-1-2-------- 
inputB --1-2-2-3-5-2- 
result -122344-5-7-4- 

在這裏,我們需要看到的是什麼三角洲是爲序列中的每個值,並推動這一三角洲結果。這可以看作這種

inputA -1-1-2-------- 
delta -1-0-1-------- 

inputB --1-2-2-3-5-2- 
delta --1-1-0-1-2-(-3)- 

result -122344-5-7-4- 

造成這種三角投影的你可以寫類似

input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue })) 
    .Select(acc => acc.Delta); 

把一起的代碼看起來是這樣的:

void Main() 
{ 
    var testScheduler = new TestScheduler(); 
    var input = testScheduler.CreateColdObservable<Update>(
     ReactiveTest.OnNext(010, new Update("a", 1)),  //1 
     ReactiveTest.OnNext(020, new Update("b", 1)),  //2 
     ReactiveTest.OnNext(030, new Update("c", 3)),  //5 
     ReactiveTest.OnNext(040, new Update("a", 1)),  //5 
     ReactiveTest.OnNext(050, new Update("b", 2)),  //6 
     ReactiveTest.OnNext(060, new Update("a", 2)),  //7 
     ReactiveTest.OnNext(070, new Update("b", 2)),  //7 
     ReactiveTest.OnNext(080, new Update("b", 3)),  //8 
     ReactiveTest.OnNext(090, new Update("b", 5)),  //10 
     ReactiveTest.OnNext(100, new Update("b", 2))  //7 

    ); 

    var currentSum = input.GroupBy(i => i.Id) 
     .SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue })) 
     .Select(acc => acc.Delta) 
     .Scan((acc, cur) => acc + cur); 

    var observer = testScheduler.CreateObserver<int>(); 
    var subscription = currentSum.Subscribe(observer); 
    testScheduler.Start(); 
    subscription.Dispose(); 

    ReactiveAssert.AreElementsEqual(new[] 
     { 
      ReactiveTest.OnNext(010, 1), 
      ReactiveTest.OnNext(020, 2), 
      ReactiveTest.OnNext(030, 5), 
      ReactiveTest.OnNext(040, 5), 
      ReactiveTest.OnNext(050, 6), 
      ReactiveTest.OnNext(060, 7), 
      ReactiveTest.OnNext(070, 7), 
      ReactiveTest.OnNext(080, 8), 
      ReactiveTest.OnNext(090, 10), 
      ReactiveTest.OnNext(100, 7)} 
     , 
     observer.Messages); 
} 

// Define other methods and classes here 
public struct Update 
{ 
    public Update(string id, int value) 
    { 
     Id = id; 
     Value = value; 
    } 
    public string Id { get; } 

    public int Value { get; } 
} 

如果你想創建多個聚合,那麼每個新的聚合只是一個像上面這樣的查詢。您可以通過在分組之後共享/發佈序列來優化目標,但我會首先確保這是分析所需的。

* CQRS/ES術語中的readmodels。

+0

謝謝你的詳盡答案 - 作爲一個獎勵教給我一些關於TestScheduler的額外信息。我會試試這個! – BackendA