2017-05-26 79 views
0

我們假設有兩個觀測值o1, o2。第一個接收來自內部進程的事件(經過非常長的計算完成),第二個接收來自REST端點的外部事件(表示另一個外部組件也完成)。事件數據只是一個ID。Rx:通過匹配ID進行連接

現在我想要設計一個工作流程,以便只有當兩個觀察對象中都存在一個ID時,纔會發出新事件(即內部和外部計算完成時)。

設在一個時間點o1包含的ID {1,2,3},那麼我想這些情況來區分:

  1. 正常情況下:例如ID 2抵達o2。兩者的ID現在出現在兩個觀測,輸出「成功:2」
  2. 到期的情況下:經過一段時間的內部計算完成,外部事件並沒有到達。例如。 ID 2存在於o1但不是在o2即使過一小時後,輸出:「過期:2」
  3. 未知情況下:一個ID,例如「未知::3」

我發現groupJoin 4,通過REST端點不存在於o1,也許是因爲在ID已經過期或僅僅是因爲有故障的外部組分,輸出到達o2可能可以做我想要的操作符,這裏甚至是屬性匹配的一個例子:GroupJoin - Joins two streams matching by one of their attributes

然而,看起來這個例子在每次新事件到達時對所有元素執行耗盡(線性時間)掃描。我認爲這將有可能推出我自己的版本,而不是在固定時間檢查地圖,但:我想知道是否有一個規範的方式,甚至是一個開箱即用的功能(因爲我猜這個是一個很常見的用例)。

(當我是新來的Rx,什麼是執行到期的情況下這樣的連接操作的最佳方式)

回答

1

您也標記了rx.net這個問題,所以我會假設在C#中給出答案的奢侈。我不確定這會轉化爲Java,如果這就是你想要的。

Rx的JoinGroupJoin並不是真正的意思:它們是基於時間窗口連接的。您正在尋找通過ID加入。

Rx友好的解決方案將是功能性的。而且由於你需要一些狀態,所以我們可以使用不可變的狀態烘焙成Scan函數。在C#中,有來自Nuget包System.Collections.ImmutableImmutableDictionary<TKey, TItem>。我不確定在Java中是否有相同的部分。

考慮到這些類:

public class CustomEvent 
{ 
    public int Id { get; set; } 
} 

public class Result 
{ 
    public ResultType Type { get; set; } 
    public int Id { get; set; } 
} 

public enum ResultType 
{ 
    Success, 
    Unknown, 
    Expired 
} 

你可以得到一個解決方案是這樣的:

IObservable<CustomEvent> o1; 
IObservable<int> o2; 
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1); 

IObservable<Result> results = Observable.Merge(
    o1.SelectMany(ce => Observable.Merge(
     Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h => 
      Tuple.Create(h.Add(ce.Id, ce), default(Result), false) 
     )), 
     Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h => 
      h.ContainsKey(ce.Id) 
       ? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true) 
       : Tuple.Create(h, default(Result), false) 
     )) 
      .Delay(expirationTimeDelay) 
    )), 
    o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h => 
     h.ContainsKey(id) 
      ? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true) 
      : Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true) 
    )) 
) 
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1)) 
.Where(t => t.Item3) 
.Select(t => t.Item2); 

不可改變的字典是我們的核心國家,並擁有o1「活」事件。累加器函數返回一個包含三個屬性的元組:表示我們核心狀態的不變字典,結果對象和布爾值。布爾對象是一個過濾器,顯示是否應傳播結果對象。

Scan一個有趣的把戲是反轉正常用法:將項目流轉換爲關閉狀態的函數。在我們的例子中,函數的類型是Func,Tuple,Results,Boolean >>(一個接收字典的函數,並返回一個包含三個值的元組)。

這就是我們在這裏所做的:每個o1項目都會彈出兩個函數:一個將項目添加到不可變的字典(並且沒有結果被推送)。另一個功能在一個小時後發佈,以查看事件是否尚未加入。如果加入,則沒有任何反應。如果未加入,則Expired結果會彈出。每個o2項目彈出一個單一功能:檢查項目是否在地圖中。如果存在,普通結果會彈出。如果不存在,則爲未知。

如果你是用Java,而且也沒有容易獲得相當於ImmutableDictionary,那麼你也許可以替代常規HashMap,但你必須要保護它免受與Publish呼叫多個用戶討厭的狀態問題。

+0

這似乎解決我的問題最好。因爲我可能會在Scala中實現它,所以不可變的數據結構是沒有問題的。如果我得到了正確的答案,這應該在不變的時間內運行!謝謝! –

2

我想通過外部對象具有中間狀態做到這一點:

public class ItemJoinCache<T> { 
    private Map<Integer, T> items; 
    public Observable<T> ingestInternal(T item) { 
     // an internal item arrived, do the necessary work 
    } 
    public Observable<T> ingestExternal(T item) { 
     // an external item arrived, do the necessary work 
    } 
} 

externalRestCallThatReturnsObservable() 
.flatMap(myItemJoinCache::ingestExternal) 
... 

internalProcessThatTakesALongTime() 
.flatMap(myItemJoinCache::ingestInternal) 
... 

這樣你就可以做任何你可能需要的處理。

+0

謝謝,這很接近我以某種方式預期的結果。但是,我接受了另一個答案,因爲它更完整。但無論如何,這是一個很好的起點! –

+0

沒問題,這是SO中的正確行爲。謝謝! –

0

您可以隨時將o1減少到scan。當o2發出一個值時,您從o1中獲取最新的集合withLatestFrom並檢查包含。 A timeout可以解決失效部分。 RxJs 5中的示例:

o2 
.withLatestFrom(
    o1.scan((set, val) => set.add(val), new Set), 
    (o2Val, o1Set) => o1Set.has(o2Val) ? "Success" : "Unknown" 
) 
.timeoutWith(3600000, Observable.of("Expire")) 
.subscribe(console.log) 
+0

看起來不錯,但我不知道這是否在不變的時間內運作。我假設「掃描」會導致線性時間運行時間。 –

+0

是的,實際上你正在構建一組來自o1的元素,這需要線性時間。關鍵是你的實際計算髮生在一個元素來自o2時,那麼最後一個可用的Set在o1時間內取得。這個解決方案比另一個更高效,更簡單。 –