我想你在問什麼您如何使用Rx創建實時閱讀模型*。
鑑於我能猜到你的問題,我想你想能夠更新一些當前狀態,每個更新消息。在你的CalculateSum
方法的情況下,你不能只彙總所有消息的Value
屬性,因爲有些將被用來更新/覆蓋現有值。
因此,考慮到這個假設,它看起來像GroupBy
將是你的朋友。如果您首先將可觀察的值序列分割爲子序列,則可以分割並征服問題。
input.GroupBy(i=>i.Id)
如果我們只考慮一個屬於同一個Id的值流,那麼每個值的總和應該是多少?
-1--1--2-
在這種簡單的情況下,答案總是隻是直接傳遞的值。即
input -1--1--2-
result -1--1--2-
然而,當我們看兩個序列生產值之成爲稍硬計算
inputA -1-1-2--------
inputB --1-2-2-3-5-2-
result -122344-5-7-4-
在這裏,我們需要看到的是什麼三角洲是爲序列中的每個值,並推動這一三角洲結果。這可以看作這種
inputA -1-1-2--------
delta -1-0-1--------
inputB --1-2-2-3-5-2-
delta --1-1-0-1-2-(-3)-
result -122344-5-7-4-
造成這種三角投影的你可以寫類似
input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
.Select(acc => acc.Delta);
把一起的代碼看起來是這樣的:
void Main()
{
var testScheduler = new TestScheduler();
var input = testScheduler.CreateColdObservable<Update>(
ReactiveTest.OnNext(010, new Update("a", 1)), //1
ReactiveTest.OnNext(020, new Update("b", 1)), //2
ReactiveTest.OnNext(030, new Update("c", 3)), //5
ReactiveTest.OnNext(040, new Update("a", 1)), //5
ReactiveTest.OnNext(050, new Update("b", 2)), //6
ReactiveTest.OnNext(060, new Update("a", 2)), //7
ReactiveTest.OnNext(070, new Update("b", 2)), //7
ReactiveTest.OnNext(080, new Update("b", 3)), //8
ReactiveTest.OnNext(090, new Update("b", 5)), //10
ReactiveTest.OnNext(100, new Update("b", 2)) //7
);
var currentSum = input.GroupBy(i => i.Id)
.SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
.Select(acc => acc.Delta)
.Scan((acc, cur) => acc + cur);
var observer = testScheduler.CreateObserver<int>();
var subscription = currentSum.Subscribe(observer);
testScheduler.Start();
subscription.Dispose();
ReactiveAssert.AreElementsEqual(new[]
{
ReactiveTest.OnNext(010, 1),
ReactiveTest.OnNext(020, 2),
ReactiveTest.OnNext(030, 5),
ReactiveTest.OnNext(040, 5),
ReactiveTest.OnNext(050, 6),
ReactiveTest.OnNext(060, 7),
ReactiveTest.OnNext(070, 7),
ReactiveTest.OnNext(080, 8),
ReactiveTest.OnNext(090, 10),
ReactiveTest.OnNext(100, 7)}
,
observer.Messages);
}
// Define other methods and classes here
public struct Update
{
public Update(string id, int value)
{
Id = id;
Value = value;
}
public string Id { get; }
public int Value { get; }
}
如果你想創建多個聚合,那麼每個新的聚合只是一個像上面這樣的查詢。您可以通過在分組之後共享/發佈序列來優化目標,但我會首先確保這是分析所需的。
* CQRS/ES術語中的readmodels。
聚合組密鑰總是等於「Id」,並且一條消息只能屬於一個聚合組? – supertopi
我們可以得到一個mcve嗎? http://stackoverflow.com/help/mcve –
你的意思是'10.000'在一萬或十個小數點後面有三個零嗎? – Enigmativity