0
具有以下問題:因爲有一個的partitionid財產(0-10爲例)的事件列表,我想進入的事件根據paritionId被分割,使具有相同partitionId的事件將按照它們收到的順序進行處理。 或多或少均勻分佈,這將導致10個事件(對於每個分區)並行處理。分裂活動,爲了處理
除了創造10單線程調度程序和發送該事件的調度權,有沒有辦法來完成上述利用工程反應堆?
感謝。
具有以下問題:因爲有一個的partitionid財產(0-10爲例)的事件列表,我想進入的事件根據paritionId被分割,使具有相同partitionId的事件將按照它們收到的順序進行處理。 或多或少均勻分佈,這將導致10個事件(對於每個分區)並行處理。分裂活動,爲了處理
除了創造10單線程調度程序和發送該事件的調度權,有沒有辦法來完成上述利用工程反應堆?
感謝。
下面
每個分區擔保其值在原順序處理具有專用線程。
@Test
public void partitioning() throws InterruptedException {
final int N = 10;
Flux<Integer> source = Flux.range(1, 10000).share();
// partition source into publishers
Publisher<Integer>[] publishers = new Publisher[N];
for (int i = 0; i < N; i++) {
final int idx = i;
publishers[idx] = source.filter(v -> v % N == idx);
}
// create ParallelFlux each 'rail' containing single partition
ParallelFlux.from(publishers)
// schedule partitions into different threads
.runOn(Schedulers.newParallel("proc", N))
// process each partition in its own thread, i.e. in order
.map(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
return it;
})
// collect results on single 'rail'
.sequential()
// and on single thread called 'subscriber-1'
.publishOn(Schedulers.newSingle("subscriber"))
.subscribe(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("subscriber-1", threadName);
});
Thread.sleep(1000);
}
如果您添加一些代碼和使用的概念的解釋,您的答案會更有用。例如,爲什麼你在發佈者上叫'share',爲什麼你要調用'sequential'和'publishOn'? –