您也標記了rx.net這個問題,所以我會假設在C#中給出答案的奢侈。我不確定這會轉化爲Java,如果這就是你想要的。
Rx的Join
和GroupJoin
並不是真正的意思:它們是基於時間窗口連接的。您正在尋找通過ID加入。
Rx友好的解決方案將是功能性的。而且由於你需要一些狀態,所以我們可以使用不可變的狀態烘焙成Scan
函數。在C#中,有來自Nuget包System.Collections.Immutable
的ImmutableDictionary<TKey, TItem>
。我不確定在Java中是否有相同的部分。
考慮到這些類:
public class CustomEvent
{
public int Id { get; set; }
}
public class Result
{
public ResultType Type { get; set; }
public int Id { get; set; }
}
public enum ResultType
{
Success,
Unknown,
Expired
}
你可以得到一個解決方案是這樣的:
IObservable<CustomEvent> o1;
IObservable<int> o2;
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1);
IObservable<Result> results = Observable.Merge(
o1.SelectMany(ce => Observable.Merge(
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
Tuple.Create(h.Add(ce.Id, ce), default(Result), false)
)),
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(ce.Id)
? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true)
: Tuple.Create(h, default(Result), false)
))
.Delay(expirationTimeDelay)
)),
o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(id)
? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true)
: Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true)
))
)
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1))
.Where(t => t.Item3)
.Select(t => t.Item2);
不可改變的字典是我們的核心國家,並擁有o1
「活」事件。累加器函數返回一個包含三個屬性的元組:表示我們核心狀態的不變字典,結果對象和布爾值。布爾對象是一個過濾器,顯示是否應傳播結果對象。
與Scan
一個有趣的把戲是反轉正常用法:將項目流轉換爲關閉狀態的函數。在我們的例子中,函數的類型是Func,Tuple,Results,Boolean >>(一個接收字典的函數,並返回一個包含三個值的元組)。
這就是我們在這裏所做的:每個o1
項目都會彈出兩個函數:一個將項目添加到不可變的字典(並且沒有結果被推送)。另一個功能在一個小時後發佈,以查看事件是否尚未加入。如果加入,則沒有任何反應。如果未加入,則Expired結果會彈出。每個o2
項目彈出一個單一功能:檢查項目是否在地圖中。如果存在,普通結果會彈出。如果不存在,則爲未知。
如果你是用Java,而且也沒有容易獲得相當於ImmutableDictionary
,那麼你也許可以替代常規HashMap
,但你必須要保護它免受與Publish
呼叫多個用戶討厭的狀態問題。
這似乎解決我的問題最好。因爲我可能會在Scala中實現它,所以不可變的數據結構是沒有問題的。如果我得到了正確的答案,這應該在不變的時間內運行!謝謝! –