我試圖根據以下信息編寫反應流:如何有條件地緩衝基於發射事件的分組Observable/Flux?
我們有一個實體事件流,其中每個事件包含其實體的ID和INTENT或COMMIT的類型。假設具有給定ID的COMMIT將始終在具有相同ID的一個或多個INTENT之前。當收到一個INTENT時,它應該按照它的ID進行分組,並且應該打開該組的「緩衝區」。當收到同一組的COMMIT或配置的超時已過時,緩衝區應該「關閉」。應該釋放得到的緩衝區。
請注意,可能在接收到關閉COMMIT之前接收多個INTENT。 (編輯:) bufferDuration
應確保在收到打開緩衝區的INTENT後bufferDuration
時間後發出任何「已打開」緩衝區,無論是否帶有COMMIT。
我在這個最新的嘗試是:
public EntityEventBufferFactory {
private final Duration bufferDuration;
public EntityEventBufferFactory(Duration bufferDuration) {
this.bufferDuration = bufferDuration;
}
public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
return eventFlux.groupBy(EntityEvent::getId)
.map(groupedFlux -> createGroupBuffer(groupedFlux))
.flatMap(Function.identity());
}
protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
}
protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
}
protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.INTENT;
}
protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
return entityEvent.getEventType() == EventType.COMMIT;
}
}
這裏是我試圖獲得通過測試:
@Test
public void entityEventsCanBeBuffered() throws Exception {
FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();
Duration bufferDuration = Duration.ofMillis(250);
Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
bufferFactory.setBufferDuration(bufferDuration);
List<List<EntityEvent>> buffers = new ArrayList<>();
bufferFlux.subscribe(buffers::add);
EntityEvent intent = new EntityEvent();
intent.setId("SOME_ID");
intent.setEventType(EventType.INTENT);
EntityEvent commit = new EntityEvent();
commit.setId(intent.getId());
commit.setEventType(EventType.COMMIT);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
eventQueue.onNext(intent);
eventQueue.onNext(commit);
Thread.sleep(500);
assertEquals(2, buffers.size());
assertFalse(buffers.get(0).isEmpty());
assertFalse(buffers.get(1).isEmpty());
}
有了這個測試中,我得到兩個發射緩衝區,但他們都是空的。你會注意到,挖掘周圍後,我不得不在某些點添加.publish()
,以免從反應堆中獲得例外說This processor allows only a single Subscriber
。這個問題的答案是RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!",這就是我所採用的方法。
我目前正在使用Reactor,但我認爲這將使用Observable和相同名稱的方法與RxJava進行1對1轉換。
有什麼想法?
這很接近,但缺少的是緩衝區在過去的緩衝區持續時間內的排放。我將編輯我的問題以使其更清楚,但是我需要在接收到第一個INTENT(打開緩衝區)後的'bufferDuration'後發出的任何給定緩衝區(假設沒有收到COMMIT) –
簡單:只需添加另一個終止條件 - 我用一個例子編輯了答案。 –
非常感謝你!我知道我要麼缺少一個信息,要麼我以不太正確的方式思考緩衝過程。原來,這是兩個。我會將適用於我的Reactor代碼片段添加到此答案中。 –