2017-05-12 49 views
2

我試圖根據以下信息編寫反應流:如何有條件地緩衝基於發射事件的分組Observable/Flux?

我們有一個實體事件流,其中每個事件包含其實體的ID和INTENT或COMMIT的類型。假設具有給定ID的COMMIT將始終在具有相同ID的一個或多個INTENT之前。當收到一個INTENT時,它應該按照它的ID進行分組,並且應該打開該組的「緩衝區」。當收到同一組的COMMIT或配置的超時已過時,緩衝區應該「關閉」。應該釋放得到的緩衝區。

請注意,可能在接收到關閉COMMIT之前接收多個INTENT。 (編輯:) bufferDuration應確保在收到打開緩衝區的INTENT後bufferDuration時間後發出任何「已打開」緩衝區,無論是否帶有COMMIT。

我在這個最新的嘗試是:

public EntityEventBufferFactory { 
    private final Duration bufferDuration; 

    public EntityEventBufferFactory(Duration bufferDuration) { 
     this.bufferDuration = bufferDuration; 
    } 

    public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) { 
     return eventFlux.groupBy(EntityEvent::getId) 
      .map(groupedFlux -> createGroupBuffer(groupedFlux)) 
      .flatMap(Function.identity()); 
    } 

    protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) { 
     return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux)); 
    } 

    protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) { 
     return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish()); 
    } 

    protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) { 
     return entityEvent.getEventType() == EventType.INTENT; 
    } 

    protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) { 
     return entityEvent.getEventType() == EventType.COMMIT; 
    } 
} 

這裏是我試圖獲得通過測試:

@Test 
public void entityEventsCanBeBuffered() throws Exception { 
    FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create(); 

    Duration bufferDuration = Duration.ofMillis(250); 

    Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue); 
    bufferFactory.setBufferDuration(bufferDuration); 

    List<List<EntityEvent>> buffers = new ArrayList<>(); 
    bufferFlux.subscribe(buffers::add); 

    EntityEvent intent = new EntityEvent(); 
    intent.setId("SOME_ID"); 
    intent.setEventType(EventType.INTENT); 

    EntityEvent commit = new EntityEvent(); 
    commit.setId(intent.getId()); 
    commit.setEventType(EventType.COMMIT); 

    eventQueue.onNext(intent); 
    eventQueue.onNext(commit); 

    eventQueue.onNext(intent); 
    eventQueue.onNext(commit); 

    Thread.sleep(500); 

    assertEquals(2, buffers.size()); 
    assertFalse(buffers.get(0).isEmpty()); 
    assertFalse(buffers.get(1).isEmpty()); 
} 

有了這個測試中,我得到兩個發射緩衝區,但他們都是空的。你會注意到,挖掘周圍後,我不得不在某些點添加.publish(),以免從反應堆中獲得例外說This processor allows only a single Subscriber。這個問題的答案是RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!",這就是我所採用的方法。

我目前正在使用Reactor,但我認爲這將使用Observable和相同名稱的方法與RxJava進行1對1轉換。

有什麼想法?

回答

1

我認爲這是Rx groupBy的權威使用案例。從文檔:

根據指定的標準對發佈者發出的項目進行分組,並將這些分組的項目發出爲GroupedFlowables。發出的GroupedPublisher在其生命週期中只允許單個訂戶,並且如果該訂戶在源終止之前取消,具有相同密鑰的源的下一個發射將觸發新的GroupedPublisher發射。

在你的情況,這個標準是ID,並在每個GroupedPublisher發出你takeUntil類型爲COMMIT:

source 
.groupBy(EntityEvent::getId) 
.flatMap(group -> 
    group 
    .takeUntil(Flowable.timer(10,TimeUnit.SECONDS)) 
    .takeUntil(this::shouldCloseBufferOnEvent) 
    .toList()) 

編輯:添加時間條件。

+0

這很接近,但缺少的是緩衝區在過去的緩衝區持續時間內的排放。我將編輯我的問題以使其更清楚,但是我需要在接收到第一個INTENT(打開緩衝區)後的'bufferDuration'後發出的任何給定緩衝區(假設沒有收到COMMIT) –

+0

簡單:只需添加另一個終止條件 - 我用一個例子編輯了答案。 –

+0

非常感謝你!我知道我要麼缺少一個信息,要麼我以不太正確的方式思考緩衝過程。原來,這是兩個。我會將適用於我的Reactor代碼片段添加到此答案中。 –

0

謝謝塔索斯巴蘇科斯的輸入。以下反應堆代碼適用於我:

public EntityEventBufferFactory { 
    private final Duration bufferDuration; 

    public EntityEventBufferFactory(Duration bufferDuration) { 
     this.bufferDuration = bufferDuration; 
    } 

    @Override 
    public Flux<List<EntityEvent>> create(Flux<EntityEvent> eventFlux) { 
     return eventFlux.groupBy(EntityEvent::getId) 
      .map(this::createGroupBuffer) 
      .flatMap(Function.identity()); 
    } 

    protected Mono<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) { 
     return groupFlux.take(bufferDuration) 
      .takeUntil(this::shouldCloseBufferOnEvent) 
      .collectList(); 
    } 

    protected boolean shouldCloseBufferOnEvent(EntityEvent EntityEvent) { 
     return EntityEvent.getEventType() == EventType.COMMIT; 
    } 
}