2014-04-03 32 views
0

有人可以展示如何通過Reactor框架執行map/reduce操作的代碼示例嗎?我想說我有Collection<Map>。我想:你如何用Reactor框架執行map reduce操作?

  1. 變換每個Map實例Foo型併發的對象(每個實例是完全獨立的另一個 - 也沒有必要給每個串行/反覆轉換)。

  2. 當它們全部被轉換時,我想調用一個方法onReduce(Collection<Foo> foos) - 該參數包含所有生成的Foo實例。

回答

1

在我看來,你不需要reduce的。該collectconsume是你:

@Test 
public void testCollect() { 
    Stream<String> stream = Streams.defer(Arrays.asList("1", "2", "3", "4", "5")).get(); 
    stream.map(Integer::parseInt) 
      .collect() 
      .consume(integers -> assertThat(integers, Matchers.contains(1, 2, 3, 4, 5))); 
} 

該樣本(Java 8)演示瞭如何發送List<String>到反應堆的Stream,將每個itemStringcollect他們到List<Integer>process對結果List

UPDATE

注意collect(5)不需要:Stream從遞延Collection適用batchSize。 最近推出了.collect(int batchSize)

+0

謝謝,我會給這個鏡頭 - 我不知道'collect'語義。 –

+0

只是試圖有幫助:[wiki頁面]上的一些代碼示例(https://github.com/reactor/reactor/wiki/Streams)將非常感激,並會消除我需要問這個問題。 –

+0

我很困惑:你將'collect'方法顯示爲一個參數('5'),但是[Stream API](http://reactor.github.io/docs/api/reactor/core/composable/) Stream.html)不會將'collect'顯示爲接受參數。想法/ –

相關問題