2015-10-01 122 views
9

正常情況下,當使用Java 8的parallelStream()時,結果是通過默認的公共fork-join池(即ForkJoinPool.commonPool())執行。Java的parallelStream()與自定義池與呼叫者工作竊取?

但是,這顯然是不可取的,但是,如果有一項工作遠離CPU限制,例如,可能會在很多時候等待IO。在這種情況下,人們會想要使用一個單獨的池,根據其他標準來確定大小(例如,多長時間的任務可能實際使用CPU)。

有沒有明顯獲取parallelStream()使用不同的池的方法,但有一種方法,如詳細here

不幸的是,該方法需要從fork-join池線程調用並行流的終端操作。這樣做的缺點是,如果目標分支連接池完全忙於現有工作,那麼整個執行過程就會等待,而完全沒有任何作用。因此該池可能成爲比單線程執行更糟的瓶頸。相比之下,當以「普通」方式使用parallelStream()時,將使用ForkJoinPool.common.externalHelpComplete()或ForkJoinPool.common.tryExternalUnpush()來讓池外的調用線程幫助處理。

有誰知道的方式來得到parallelStream()使用非默認的fork-join池有一個調用線程從的fork-join池的幫助之外在這項工作中的處理(但不是fork-join池的其餘部分)?

+3

我不明白你的_這個缺點是,如果目標叉加入池完全忙於現有的work_。你不會爲這個並行流調用創建一個新的池嗎? –

+1

更糟。當你在任務中調用'get'而不是在公共池中時,它仍然會調用'ForkJoinPool.common.tryExternalUnpush()',但是當然不會在公共池隊列中找到任務。 – Holger

+0

要回答這個問題,不,我不會爲這個調用創建一個新的線程池。相反,我會在許多類似的調用中共享另一個線程池,其中一些可能會重疊,其中一些可能比其他更長/更大的任務等。 –

回答

2

您可以在泳池上使用awaitQuiescence來幫忙。然而,你不能選擇你將要幫助的任務,它只會從池中取出下一個待處理的任務,因此,如果有更多的待處理任務,你可能會在完成你自己的任務之前結束執行。

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) // use zero timeout to execute one task only 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

將打印true

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// overload: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

將永遠掛,因爲它試圖執行第二封堵任務。不過,只要沒有無限的任務(上面的例子是極端的,只是爲了演示),它會讓啓動線程幫助處理池的待處理任務,這將提高自己的任務執行的機會。


但要注意,在叉的全部關係/ join框架和Stream API是一個實現細節反正。

+0

我已經得出結論說我可以這樣做,但正如你所指出的那樣,這很可能意味着我最終將幫助完成其他任務,而不是幫助自己完成任務。這是一種非首發。 此外,我得到fork-join是一個實現細節,但需要更好地控制parallelStream(),例如,像parallelStream(forkJoinPool)一樣簡單。 –

+1

好吧,用'parallelStream(ForkJoinPool)'這樣的方法,它不再是一個實現細節了...... – Holger

+0

不,但是沒有參數的parallelStream()會做什麼仍然是一個實現細節。這隻會給你一個選擇,在你需要的情況下進行一些控制。 –