2015-12-30 37 views
3

我已經創建了一個固定的線程池來處理每300毫秒發送一個事件,並假定該進程需要1000毫秒。假設多線程將工作,但只有一個線程被重用。RxJava - 爲什麼執行者只使用一個線程

如果我設置的sleepTime小於300毫秒,處理線程改變,但是沒用。

問題:我能做些什麼來使它併發?爲什麼程序重用線程?

預先感謝您

public static void main(String[] args) throws InterruptedException { 
    long sleepTime = 1000; 
    ExecutorService e = Executors.newFixedThreadPool(3); 

    Observable.interval(300, TimeUnit.MILLISECONDS) 
    .subscribeOn(Schedulers.computation()) 
    .flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(Long pT) { 
      return Observable.just(pT).subscribeOn(Schedulers.from(e)); 
     } 
    }) 
    .doOnNext(new Action1<Long>() { 

     @Override 
     public void call(Long pT) { 
      try { 
       Thread.sleep(sleepTime); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

     } 
    }) 
    .subscribe(new Action1<Long>() { 

     @Override 
     public void call(Long pT) { 
      System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName()); 

     } 
    }); 


    Thread.sleep(50000); 
    e.shutdownNow(); 

} 

日誌

i am 0in thread:pool-1-thread-1 
i am 1in thread:pool-1-thread-1 
i am 2in thread:pool-1-thread-1 
i am 3in thread:pool-1-thread-1 
i am 4in thread:pool-1-thread-1 
i am 5in thread:pool-1-thread-1 
i am 6in thread:pool-1-thread-1 
i am 7in thread:pool-1-thread-1 
i am 8in thread:pool-1-thread-1 
i am 9in thread:pool-1-thread-1 
i am 10in thread:pool-1-thread-1 
i am 11in thread:pool-1-thread-1 
+0

請注意:您可以使用jvisualvm來更加可靠地計算出進度計劃和使用哪些線程:http://docs.oracle.com/javase/6/docs/technotes/tools/ share/jvisualvm.html –

+0

@ReutSharabani在eclipse Debug視圖中,我可以看到生成的線程,但程序只重用一個線程。 – Rockman12352

回答

0

據我瞭解在你的代碼,生產者在比用戶更快的速度生產。但是Observable<Long> interval(long interval, TimeUnit unit)實際上不支持Backpressure。文檔指出

該操作員不支持背壓,因爲它使用時間。如果下游的 需要較慢,則應該減慢計時器或使用類似於{@link #onBackpressureDrop}的東西 。

如果你的處理真的比生產速度較慢,你可以在你的用戶代碼做的就是這樣的事情

.subscribe(new Action1<Long>() { 

    @Override 
    public void call(Long pT) { 
     e.submit(new Runnable() { 
      System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName()); 

     } 
    } 
}); 
+0

當然,我可以像你說的那樣把任務提交給不同的線程。但我想用Scheduler自然地做到這一點。 – Rockman12352

+0

@ Rockman12352我同意,但是據我所知,Observable將執行整個執行過程(從生產者到訂閱者)以單線程發射。每個生產者的「長」數據的含義,它將在單線程中調用所有訂閱者。我可能在這裏是錯的,但這就是我迄今得到的結果 – Wins

0

相反

.subscribeOn(Schedulers.computation()) 

嘗試

.observeOn(Schedulers.computation()) 

這個我以前玩過機智的例子h與Rx的併發工作相當不錯,例如

public class ObservableZip { 

private Scheduler scheduler; 
private Scheduler scheduler1; 
private Scheduler scheduler2; 

@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("Async:", start, result)); 
} 




public void showResult(String transactionType, long start, String result) { 
    System.out.println(result + " " + 
           transactionType + String.valueOf(System.currentTimeMillis() - start)); 
} 


public Observable<String> obAsyncString() { 
    return Observable.just("") 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obAsyncString1() { 
    return Observable.just("") 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obAsyncString2() { 
    return Observable.just("") 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
    } 

} 
+0

它不起作用。在你的例子中,3個observable來自不同的線程,所以它自然是多線程的。但在我的情況下,我想把它分成一個池。 – Rockman12352

+0

但是每次調用Schedulers.computation()都不會給你一個新的線程? – paul

+0

是的,我會嘗試另一臺電腦 – Rockman12352

0

我在GitHub上找到了答案!

內部observable確實發射在多線程上,但接下來做的不是。如果我想讓它平行,我應該在內部可觀察的情況下做。

+0

你有鏈接或例子嗎? – Christoph

相關問題