我正試圖在Flux的Fluxes上實現Reactor中的緩衝過程。內部通量的每個發射都按某種屬性分組,並在緩衝區持續時間到期後發射。如何緩衝GroupedFlux的Flux?
以下測試(簡化爲暴露問題)示出所期望的行爲:
private static final long STEP_MILLIS = 50;
private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4);
@Test
public void testBuffer() throws Exception {
List<List<String>> buffers = new ArrayList<>();
UnicastProcessor<String> queue = UnicastProcessor.create();
FluxSink<String> sink = queue.sink();
Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just);
// Buffering
fluxOfFlux.flatMap(Function.identity())
.transform(this::buffer)
.subscribe(buffers::add);
sink.next("TEST 1");
Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS); // One "step" before Buffer should close
assertTrue(buffers.isEmpty());
sink.next("TEST 2");
assertTrue(buffers.isEmpty());
Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close
assertEquals(1, buffers.size());
assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0));
}
public Flux<List<String>> buffer(Flux<String> publisher) {
return publisher
.groupBy(Object::getClass)
.map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList())
.flatMap(Function.identity());
}
該測試工作方式寫的;問題出現時,我嘗試了「flatMap」結合起來,在「改造」操作一個操作:
// Buffering
fluxOfFlux.flatMap(flux -> flux.transform(this::buffer))
.subscribe(buffers::add);
然後測試失敗的投入到隊列中的每個項目會立即發出作爲一個單獨的「緩衝」 。
這是不受歡迎的,因爲在緩衝之前必須使用flatMap消除了並行處理內部通量的能力。
這是用Reactor編寫的,但應該使用Observable和類似名稱的方法與RxJava進行1對1映射。
有人想過我可能做錯了什麼?