2017-08-20 75 views
2

我寫了一個合併排序來測試scala Future [T]類型的異步計算性能。我有一個4核心的CPU,所以我預計異步計算比同步計算快大約4倍,因爲我使用完整的CPU能力(由於子任務的大小相同,失速時間應該很小)。但是,結果顯示異步合併排序比正常合併排序慢。斯卡拉未來[T]頭頂沉重?

難道我寫的是併發,還是僅僅因爲未來的[T]開銷?任何人都可以幫我解釋一下嗎?

package kai.concurrent 

import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, Future} 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.Random 

object MergeSort { 
    lazy val regressThreadhold = 10000 

    def mergeSortedList[T](a: Seq[T], b: Seq[T])(implicit ord: Ordering[T]): Seq[T] = { 
    def loop(a: Seq[T], b: Seq[T], acc: Seq[T]): Seq[T] = { 
     if (a.isEmpty && b.isEmpty) acc 
     else if (a.isEmpty) b.reverse ++: acc 
     else if (b.isEmpty) a.reverse ++: acc 
     else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) 
     else loop(a, b.tail, b.head +: acc) 
    } 

    loop(a, b, Seq()).reverse 
    } 

    def mergeSortAsync0[T](x: Seq[T])(implicit ord: Ordering[T]): Future[Seq[T]] = 
    if (x.size <= regressThreadhold) Future(mergeSort(x)) else { 
     val (left, right) = x.splitAt(x.size/2) 
     val Seq(leftSorted, rightSorted) = Seq(left, right).map(seq => Future(mergeSortAsync0(seq)).flatten) 
     leftSorted.zip(rightSorted).map(pair => mergeSortedList(pair._1, pair._2)) 
    } 

    def mergeSortAsync[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    Await.result(mergeSortAsync0(x), Duration.Inf) 

    def mergeSort[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    if (x.size <= 1) x else { 
     val (left, right) = x.splitAt(x.size/2) 
     val (leftSorted, rightSorted) = (mergeSort(left), mergeSort(right)) 
     mergeSortedList(leftSorted, rightSorted) 
    } 
} 

object MergeSortTest extends App { 

    import kai.util.ProfileUtil.TimeResult 

    val seq: Vector[Double] = (1 to 1000000).map(i => Random.nextDouble()).toVector 
    val seqMergeSortAsync = MergeSort.mergeSortAsync(seq) withWallTimePrinted "mergeSortAsync" 
    val seqMergeSort = MergeSort.mergeSort(seq) withWallTimePrinted "mergeSort" 
    val seqSort = seq.sorted withWallTimePrinted "sorted" 
    println(seqSort == seqMergeSort && seqMergeSort == seqMergeSortAsync) 
} 

輸出:

mergeSortAsync elapsed time: 3186 ms 

mergeSort elapsed time: 3300 ms 

sorted elapsed time: 581 ms 

true 
+0

你平均值多個呼叫的時間? JVM可能不會熱身。此外,請確保代碼打到所有內核,例如在Mac OS X中查看活動監視器。 –

回答

4

我複製你的測試(使用sbt-jmh)運行它通過江鈴控股。我使用預定義的scala.concurrent.ExecutionContext.Implicits.global作爲測試中的底層執行上下文。

結果:

[info] Benchmark       Mode Cnt Score Error Units 
[info] MergeSortTest.benchMergeSortAsync avgt 25 1.534 +–’ 0.212 s/op 
[info] MergeSortTest.benchMergeSortSync avgt 25 2.325 +–’ 0.437 s/op 
[info] MergeSortTest.benchScalaSort  avgt 25 0.382 +–’ 0.006 s/op 

你可以在這裏看到,運行的水貨版本約X1.5比順序版本速度更快,而斯卡拉排序比順序合併排序快5233倍。

需要記住的是,當做這樣的微基準時,需要考慮很多因素。通常最好讓JMH處理JVM運行時爲您提供的微妙之處。

plugins.sbt:

addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.27") 

build.sbt:

enablePlugins(JmhPlugin) 

測試代碼:

import java.util.concurrent.TimeUnit 

import org.openjdk.jmh.annotations._ 

import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, Future} 
import scala.util.Random 
import scala.concurrent.ExecutionContext.Implicits.global 

/** 
    * Created by Yuval.Itzchakov on 21/08/2017. 
    */ 
@State(Scope.Thread) 
@Warmup(iterations = 3, time = 1) 
@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS) 
@BenchmarkMode(Array(Mode.AverageTime)) 
@Fork(5) 
class MergeSortTest { 

    var seq: Seq[Double] = _ 

    @Setup 
    def setup(): Unit = { 
    seq = (1 to 1000000).map(i => Random.nextDouble()).toVector 
    } 

    lazy val regressThreadhold = 10000 

    def mergeSortedList[T](a: Seq[T], b: Seq[T])(implicit ord: Ordering[T]): Seq[T] = { 
    def loop(a: Seq[T], b: Seq[T], acc: Seq[T]): Seq[T] = { 
     if (a.isEmpty && b.isEmpty) acc 
     else if (a.isEmpty) b.reverse ++: acc 
     else if (b.isEmpty) a.reverse ++: acc 
     else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) 
     else loop(a, b.tail, b.head +: acc) 
    } 

    loop(a, b, Seq()).reverse 
    } 

    def mergeSortAsync0[T](x: Seq[T])(implicit ord: Ordering[T]): Future[Seq[T]] = 
    if (x.size <= regressThreadhold) Future(mergeSort(x)) else { 
     val (left, right) = x.splitAt(x.size/2) 
     val Seq(leftSorted, rightSorted) = Seq(left, right).map(seq => Future(mergeSortAsync0(seq)).flatten) 
     leftSorted.zip(rightSorted).map(pair => mergeSortedList(pair._1, pair._2)) 
    } 

    def mergeSortAsync[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    Await.result(mergeSortAsync0(x), Duration.Inf) 

    def mergeSort[T](x: Seq[T])(implicit ord: Ordering[T]): Seq[T] = 
    if (x.size <= 1) x else { 
     val (left, right) = x.splitAt(x.size/2) 
     val (leftSorted, rightSorted) = (mergeSort(left), mergeSort(right)) 
     mergeSortedList(leftSorted, rightSorted) 
    } 

    @Benchmark 
    def benchMergeSortSync(): Unit = { 
    mergeSort(seq) 
    } 

    @Benchmark 
    def benchMergeSortAsync(): Unit = { 
    mergeSortAsync(seq) 
    } 

    @Benchmark 
    def benchScalaSort(): Unit = { 
    seq.sorted 
    } 
} 
+1

非常感謝! –

+0

另外,異步不是爲了性能,而是爲了可擴展性:) –