2017-06-15 34 views
0

不會不工作的方法,我試圖整合阻擋消費者在反應堆鋁SR1一個通量用戶。我想使用並行的調度程序來同時執行阻塞操作。通量的publishOn預期

我已經實現一個主類來描述我的意圖:

package etienne.peiniau; 

import org.reactivestreams.Subscriber; 
import org.reactivestreams.Subscription; 
import reactor.core.publisher.Flux; 
import reactor.core.scheduler.Schedulers; 
import reactor.util.function.Tuple2; 

public class Main { 

    public static void main(String[] args) throws InterruptedException { 
     Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 
       .elapsed() 
       .publishOn(Schedulers.parallel()) 
       .subscribe(new Subscriber<Tuple2<Long, Integer>>() { 
        @Override 
        public void onSubscribe(Subscription subscription) { 
         System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
         subscription.request(Long.MAX_VALUE); 
        } 

        @Override 
        public void onNext(Tuple2<Long, Integer> t2) { 
         System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
         try { 
          Thread.sleep(1000); // long operation 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
        } 

        @Override 
        public void onComplete() { 
         System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
        } 
       }); 
     // Waiting for the program to complete 
     System.out.println("[" + Thread.currentThread().getName() + "] Main"); 
     Thread.sleep(100000); 
    } 

} 

這段代碼的輸出如下:

[main] Subscription 
[main] Main 
[parallel-1] [3,1] 
[parallel-1] [1000,2] 
[parallel-1] [1001,3] 
[parallel-1] [1000,4] 
[parallel-1] [1000,5] 
[parallel-1] [1000,6] 
[parallel-1] [1001,7] 
[parallel-1] [1000,8] 
[parallel-1] [1000,9] 
[parallel-1] [1000,10] 
[parallel-1] [1000,11] 
[parallel-1] [1001,12] 
[parallel-1] [1000,13] 
[parallel-1] [1000,14] 
[parallel-1] [1000,15] 
[parallel-1] [1000,16] 
[parallel-1] [1001,17] 
[parallel-1] [1000,18] 
[parallel-1] [1000,19] 
[parallel-1] [1000,20] 
[parallel-1] Complete 

我的問題是,長時間操作總是執行上線程並行-1和每1秒。

我試着手動增加並行度或使用彈性調度程序,但結果是一樣的。

我在想,publishOn方法用於該用途的情況下,被專門設計的。你能否告訴我我是否誤解了某些東西?

回答

2

其實它按預期工作,你可以看到,凡在並行處理所有的值 - 經過時間幾乎是相同的,但你總是在同一個線程內收到的元素,這樣,每次等待1秒的時間。

我想,在簡單Flux並行並不意味着更多的線程,這意味着在並行做的工作。如果你喜歡例如運行代碼:

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
      .map(i -> { 
       System.out.println("map [" + Thread.currentThread().getName() + "] " + i); 
       return i; 
      }) 
      .elapsed() 
      .publishOn(Schedulers.single()) 
      .subscribeOn(Schedulers.single()) 
      .subscribe(t2 -> { 
       System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2); 
      }); 

你會看到的結果:

map [single-1] 0 
map [single-1] 1 
... 
subscribe [single-1] [4,0] 
subscribe [single-1] [0,1] 
... 

而且你可以看到,首先它做map所有元素,然後consume他們。但是,如果你改變例如.publishOn(Schedulers.parallel())您將看到:

map [single-1] 3 
subscribe [parallel-1] [5,0] 
map [single-1] 4 
subscribe [parallel-1] [0,1] 
map [single-1] 5 
... 

所以這一次在做並行線程兩種操作。我不確定我是否正確理解一切。

有並行執行特定ParallelFlux。在下面的例子一切都將在不同的線程來完成

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
     .elapsed() 
     .parallel() 
     .runOn(Schedulers.parallel()) 
     .subscribe(t2 -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
      try { 
       Thread.sleep(1000); // long operation 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }, throwable -> { 
      System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
     },() -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
     }, subscription -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
      subscription.request(Long.MAX_VALUE); 
     }); 

結果是這樣的:

[parallel-1] [8,0] 
[parallel-2] [0,1] 
[parallel-3] [0,2] 
[parallel-4] [0,3] 
[parallel-1] [0,4] 
... 

所以它使用幾個線程來處理結果。從我的角度來看,它確實是平行的。

還要注意的是,如果你使用的方法.subscribe(Subscriber<? super T> s)所有結果將在連續的方式被消耗掉,並使用它們並行你應該使用:

public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> 
      onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe) 

或任何其他重載方法與Consumer<? super T> onNext,...參數

+0

@etiennepeiniau我猜在簡單的Flux中,它不是「並行==更多的線程」。我將編輯我的答案並添加一個示例。 –