2014-04-19 32 views
40

我正在玩Java 8的流,無法理解我得到的性能結果。Java 8的流:爲什麼並行流更慢?

我有2個核心CPU(Intel i73520M),Windows 8 x64和64位Java 8 update 5.我正在做一個串流/並行串流的簡單映射,發現並行版本有點慢。

當我運行這段代碼:

String[] array = new String[1000000]; 
Arrays.fill(array, "AbabagalamagA"); 

Stream<String> stream = Arrays.stream(array); 

long time1 = System.nanoTime(); 

List<String> list = stream.map((x) -> x.toLowerCase()).collect(Collectors.toList()); 

long time2 = System.nanoTime(); 

System.out.println((time2 - time1)/1000000f); 

...我正在某處大約600這個版本的結果,使用平行流:

String[] array = new String[1000000]; 
Arrays.fill(array, "AbabagalamagA"); 

Stream<String> stream = Arrays.stream(array).parallel(); 

long time1 = System.nanoTime(); 

List<String> list = stream.map((x) -> x.toLowerCase()).collect(Collectors.toList()); 

long time2 = System.nanoTime(); 


System.out.println((time2 - time1)/1000000f); 

...給我的東西約900.

考慮到我有2個CPU核心的事實,平行版本不應該更快嗎? 有人能給我一個提示,爲什麼並行版本更慢?

回答

94

這裏有幾個問題是平行的,因爲它是。

第一個問題是,並行解決問題始終涉及比順序執行更多的實際工作。開銷涉及在多個線程之間分割工作並加入或合併結果。像將短字符串轉換爲小寫字母這樣的問題足夠小,以至於它們有可能被平行分割開銷所淹沒。

第二個問題是基準測試Java程序非常微妙,並且很容易得出令人困惑的結果。兩個常見問題是JIT編譯和死代碼消除。短基準通常在JIT編譯之前或期間完成,因此它們不會測量峯值吞吐量,事實上它們可能會測量JIT本身。編譯發生時有些不確定,所以它可能會導致結果變化很大。

對於小的綜合性基準測試,工作負載通常會計算被丟棄的結果。 JIT編譯器非常擅長檢測並消除不產生任何結果的代碼。在這種情況下,這可能不會發生,但如果您對其他合成工作負載進行了修改,則肯定會發生。當然,如果JIT消除了基準測試工作負載,則會使基準測試無效。

我強烈建議使用一個完善的基準測試框架,如JMH而不是手動滾動你自己的一個。 JMH有助於避免常見的基準缺陷(包括這些缺陷),並且設置和運行起來非常簡單。這裏是你的基準轉換爲使用江鈴控股:

package com.stackoverflow.questions; 

import java.util.Arrays; 
import java.util.List; 
import java.util.stream.Collectors; 
import java.util.concurrent.TimeUnit; 

import org.openjdk.jmh.annotations.*; 

public class SO23170832 { 
    @State(Scope.Benchmark) 
    public static class BenchmarkState { 
     static String[] array; 
     static { 
      array = new String[1000000]; 
      Arrays.fill(array, "AbabagalamagA"); 
     } 
    } 

    @GenerateMicroBenchmark 
    @OutputTimeUnit(TimeUnit.SECONDS) 
    public List<String> sequential(BenchmarkState state) { 
     return 
      Arrays.stream(state.array) 
        .map(x -> x.toLowerCase()) 
        .collect(Collectors.toList()); 
    } 

    @GenerateMicroBenchmark 
    @OutputTimeUnit(TimeUnit.SECONDS) 
    public List<String> parallel(BenchmarkState state) { 
     return 
      Arrays.stream(state.array) 
        .parallel() 
        .map(x -> x.toLowerCase()) 
        .collect(Collectors.toList()); 
    } 
} 

我跑這個使用下面的命令:

java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1 

(該選項表明5次預熱迭代,五次基準迭代和一個分叉JVM)在其運行,JMH發出了很多冗長的消息,這些消息我都沒有。總結結果如下。

Benchmark      Mode Samples   Mean Mean error Units 
c.s.q.SO23170832.parallel  thrpt   5  4.600  5.995 ops/s 
c.s.q.SO23170832.sequential thrpt   5  1.500  1.727 ops/s 

注意,結果是每秒OPS,所以它看起來像並聯運行比順序運行速度的三倍左右。但我的機器只有兩個內核。嗯。每次運行的平均誤差實際上大於平均運行時間! WAT?可怕的事情正在發生。

這給我們帶來了第三個問題。仔細查看工作負載,我們可以看到它爲每個輸入分配一個新的String對象,並且它還將結果收集到一個列表中,這涉及大量的重新分配和複製。我猜想這會導致相當數量的垃圾收集。我們可以通過重新運行啓用了GC消息基準看到這一點:

java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1 

此給出的結果一樣:

[GC (Allocation Failure) 512K->432K(130560K), 0.0024130 secs] 
[GC (Allocation Failure) 944K->520K(131072K), 0.0015740 secs] 
[GC (Allocation Failure) 1544K->777K(131072K), 0.0032490 secs] 
[GC (Allocation Failure) 1801K->1027K(132096K), 0.0023940 secs] 
# Run progress: 0.00% complete, ETA 00:00:20 
# VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java 
# VM options: -verbose:gc 
# Fork: 1 of 1 
[GC (Allocation Failure) 512K->424K(130560K), 0.0015460 secs] 
[GC (Allocation Failure) 933K->552K(131072K), 0.0014050 secs] 
[GC (Allocation Failure) 1576K->850K(131072K), 0.0023050 secs] 
[GC (Allocation Failure) 3075K->1561K(132096K), 0.0045140 secs] 
[GC (Allocation Failure) 1874K->1059K(132096K), 0.0062330 secs] 
# Warmup: 5 iterations, 1 s each 
# Measurement: 5 iterations, 1 s each 
# Threads: 1 thread, will synchronize iterations 
# Benchmark mode: Throughput, ops/time 
# Benchmark: com.stackoverflow.questions.SO23170832.parallel 
# Warmup Iteration 1: [GC (Allocation Failure) 7014K->5445K(132096K), 0.0184680 secs] 
[GC (Allocation Failure) 7493K->6346K(135168K), 0.0068380 secs] 
[GC (Allocation Failure) 10442K->8663K(135168K), 0.0155600 secs] 
[GC (Allocation Failure) 12759K->11051K(139776K), 0.0148190 secs] 
[GC (Allocation Failure) 18219K->15067K(140800K), 0.0241780 secs] 
[GC (Allocation Failure) 22167K->19214K(145920K), 0.0208510 secs] 
[GC (Allocation Failure) 29454K->25065K(147456K), 0.0333080 secs] 
[GC (Allocation Failure) 35305K->30729K(153600K), 0.0376610 secs] 
[GC (Allocation Failure) 46089K->39406K(154624K), 0.0406060 secs] 
[GC (Allocation Failure) 54766K->48299K(164352K), 0.0550140 secs] 
[GC (Allocation Failure) 71851K->62725K(165376K), 0.0612780 secs] 
[GC (Allocation Failure) 86277K->74864K(184320K), 0.0649210 secs] 
[GC (Allocation Failure) 111216K->94203K(185856K), 0.0875710 secs] 
[GC (Allocation Failure) 130555K->114932K(199680K), 0.1030540 secs] 
[GC (Allocation Failure) 162548K->141952K(203264K), 0.1315720 secs] 
[Full GC (Ergonomics) 141952K->59696K(159232K), 0.5150890 secs] 
[GC (Allocation Failure) 105613K->85547K(184832K), 0.0738530 secs] 
1.183 ops/s 

注:與#開始的行都是正常的江鈴控股輸出線。其餘的都是GC消息。這只是五次熱身迭代中的第一次,這是五次基準迭代之前的事情。在其餘的迭代期間,GC消息繼續以同樣的方式繼續。我認爲可以肯定地說,衡量的業績是由GC開銷主導的,並且不應該相信所報告的結果。

此時尚不清楚該怎麼做。這純粹是一種綜合工作量。與分配和複製相比,這顯然涉及非常少的CPU時間來完成實際工作。很難說你在這裏測量的是什麼。一種方法是提出一種不同的工作負載,在某種意義上說它更「真實」。另一種方法是在基準測試期間更改堆和GC參數以避免GC。

+12

+1非常全面的答案和一個關於如何正確運行*和解釋*微基準的好教程! – assylias

8

使用多個線程處理您的數據有一些初始設置成本,例如,初始化線程池。這些成本可能超過使用這些線程的收益,特別是如果運行時已經很低。此外,如果存在爭用,例如其他線程運行,後臺進程等,並行處理的性能可以進一步降低。

這個問題對於並行處理來說並不新鮮。本文給出了一些細節的Java 8 parallel()的光線和更多的事情要考慮:http://java.dzone.com/articles/think-twice-using-java-8

14

在做基準,你應該注意的JIT編譯器,以及定時行爲可以改變,當JIT踢英寸如果我爲測試程序添加預熱階段,那麼並行版本比順序版本要快一點。以下是結果:

Warmup... 
Benchmark... 
Run 0: sequential 0.12s - parallel 0.11s 
Run 1: sequential 0.13s - parallel 0.08s 
Run 2: sequential 0.15s - parallel 0.08s 
Run 3: sequential 0.12s - parallel 0.11s 
Run 4: sequential 0.13s - parallel 0.08s 

以下是完整的源代碼,我用於此測試。

public static void main(String... args) { 
    String[] array = new String[1000000]; 
    Arrays.fill(array, "AbabagalamagA"); 
    System.out.println("Warmup..."); 
    for (int i = 0; i < 100; ++i) { 
     sequential(array); 
     parallel(array); 
    } 
    System.out.println("Benchmark..."); 
    for (int i = 0; i < 5; ++i) { 
     System.out.printf("Run %d: sequential %s - parallel %s\n", 
      i, 
      test(() -> sequential(array)), 
      test(() -> parallel(array))); 
    } 
} 
private static void sequential(String[] array) { 
    Arrays.stream(array).map(String::toLowerCase).collect(Collectors.toList()); 
} 
private static void parallel(String[] array) { 
    Arrays.stream(array).parallel().map(String::toLowerCase).collect(Collectors.toList()); 
} 
private static String test(Runnable runnable) { 
    long start = System.currentTimeMillis(); 
    runnable.run(); 
    long elapsed = System.currentTimeMillis() - start; 
    return String.format("%4.2fs", elapsed/1000.0); 
}