2017-06-19 72 views
1

我正試圖在Flux的Fluxes上實現Reactor中的緩衝過程。內部通量的每個發射都按某種屬性分組,並在緩衝區持續時間到期後發射。如何緩衝GroupedFlux的Flux?

以下測試(簡化爲暴露問題)示出所期望的行爲:

private static final long STEP_MILLIS = 50; 

private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4); 

@Test 
public void testBuffer() throws Exception { 
    List<List<String>> buffers = new ArrayList<>(); 

    UnicastProcessor<String> queue = UnicastProcessor.create(); 
    FluxSink<String> sink = queue.sink(); 

    Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just); 

    // Buffering 
    fluxOfFlux.flatMap(Function.identity()) 
     .transform(this::buffer) 
     .subscribe(buffers::add); 

    sink.next("TEST 1"); 

    Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS); // One "step" before Buffer should close 

    assertTrue(buffers.isEmpty()); 

    sink.next("TEST 2"); 

    assertTrue(buffers.isEmpty()); 

    Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close 

    assertEquals(1, buffers.size()); 
    assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0)); 
} 

public Flux<List<String>> buffer(Flux<String> publisher) { 
    return publisher 
     .groupBy(Object::getClass) 
     .map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList()) 
     .flatMap(Function.identity()); 
} 

該測試工作方式寫的;問題出現時,我嘗試了「flatMap」結合起來,在「改造」操作一個操作:

// Buffering 
    fluxOfFlux.flatMap(flux -> flux.transform(this::buffer)) 
     .subscribe(buffers::add); 

然後測試失敗的投入到隊列中的每個項目會立即發出作爲一個單獨的「緩衝」 。

這是不受歡迎的,因爲在緩衝之前必須使用flatMap消除了並行處理內部通量的能力。

這是用Reactor編寫的,但應該使用Observable和類似名稱的方法與RxJava進行1對1映射。

有人想過我可能做錯了什麼?

回答

1

我想出了組合方法的問題:jist是我想要完成的事情對於寫入的示例數據是不可能的。

使用濃縮的方法,實際上最終得到緩衝的是每個內部Flux本身。由於這些Flux中的每一個在發射單體之後完成,這會導致封閉緩衝區完成並被髮射。這導致了我的應用程序發生了什麼問題。

我不得不通過改變如何我到內通量產生的內通量更上游到第一concatMap然後組來解決這個缺陷在我的應用程序的設計。

0

我仍然從反應堆開始,但是你的代碼似乎並不是無副作用的。保持狀態(在您的情況下使用列表<>緩衝區)可能會導致未來的令人討厭的錯誤。

可以實現只用immutables一樣的,你可以看到波紋管

List<String> tests = Arrays.asList("TEST 1", "TEST 2", "TEST 3", "INTEGRATION 1", "INTEGRATION 2") 

@Test 
public void test() { 
    Flux<List<String>> bufferedFlux = Flux.fromIterable(tests).buffer(Duration.ofSeconds(5)); 

    StepVerifier.withVirtualTime(() -> bufferedFlux) 
      .expectNext(tests) 
      .verifyComplete(); 
} 

@Test 
public void group() { 
    Flux<GroupedFlux<String, String>> flux = Flux 
      .fromIterable(tests) 
      .groupBy(test -> test.replaceAll("[^a-zA-Z]", "")); 

    StepVerifier.create(flux) 
      .expectNextMatches(groupedFlux -> groupedFlux.key().equals("TEST")) 
      .expectNextMatches(groupedFlux -> groupedFlux.key().equals("INTEGRATION")) 
      .verifyComplete(); 


    Flux<String> sum = flux 
      .flatMap(groupedFlux -> groupedFlux 
        .map(test -> test.replaceAll("[^\\d.]", "")) 
        .map(Integer::valueOf) 
        .reduce((t1, t2) -> t1 + t2) 
        .map(total -> groupedFlux.key() + " TOTAL: " + total) 
      ).sort(); 

    StepVerifier.create(sum) 
      .expectNext("INTEGRATION TOTAL: 3") 
      .expectNext("TEST TOTAL: 6") 
      .verifyComplete(); 
}