2016-09-26 28 views
0

具有以下問題:因爲有一個的partitionid財產(0-10爲例)的事件列表,我想進入的事件根據paritionId被分割,使具有相同partitionId的事件將按照它們收到的順序進行處理。 或多或少均勻分佈,這將導致10個事件(對於每個分區)並行處理。分裂活動,爲了處理

除了創造10單線程調度程序和發送該事件的調度權,有沒有辦法來完成上述利用工程反應堆?

感謝。

回答

0

下面

  • 分裂源流到分區的代碼,
  • 每個分區創建ParallelFlux,一個 「軌道」,
  • 日程表 「軌道」 成單獨的線程,
  • 收集結果

每個分區擔保其值在原順序處理具有專用線程。

@Test 
public void partitioning() throws InterruptedException { 
    final int N = 10; 
    Flux<Integer> source = Flux.range(1, 10000).share(); 
    // partition source into publishers 
    Publisher<Integer>[] publishers = new Publisher[N]; 
    for (int i = 0; i < N; i++) { 
     final int idx = i; 
     publishers[idx] = source.filter(v -> v % N == idx); 
    } 
    // create ParallelFlux each 'rail' containing single partition 
    ParallelFlux.from(publishers) 
      // schedule partitions into different threads 
      .runOn(Schedulers.newParallel("proc", N)) 
      // process each partition in its own thread, i.e. in order 
      .map(it -> { 
       String threadName = Thread.currentThread().getName(); 
       Assert.assertEquals("proc-" + (it % 10 + 1), threadName); 
       return it; 
      }) 
      // collect results on single 'rail' 
      .sequential() 
      // and on single thread called 'subscriber-1' 
      .publishOn(Schedulers.newSingle("subscriber")) 
      .subscribe(it -> { 
       String threadName = Thread.currentThread().getName(); 
       Assert.assertEquals("subscriber-1", threadName); 
      }); 
    Thread.sleep(1000); 
} 
+0

如果您添加一些代碼和使用的概念的解釋,您的答案會更有用。例如,爲什麼你在發佈者上叫'share',爲什麼你要調用'sequential'和'publishOn'? –