2017-09-05 39 views
0

我已經使用monix和akka-streams將List [ClassA]映射到List [ClassB]的基準,但我不明白它爲什麼如此緩慢。爲什麼用monix或akka-streams將A類映射到B類太慢了?

我嘗試了不同的方法來映射,這裏是與江鈴控股的結果:

[info] Benchmark         Mode Cnt Score Error Units 
[info] MappingBenchmark.akkaLoadBalanceMap   ss 20 742,626 â–’ 4,853 ms/op 
[info] MappingBenchmark.akkaMapAsyncFold    ss 20 480,460 â–’ 8,493 ms/op 
[info] MappingBenchmark.akkaMapAsyncFoldAsync   ss 20 331,398 â–’ 10,490 ms/op 
[info] MappingBenchmark.akkaMapFold     ss 20 713,500 â–’ 7,394 ms/op 
[info] MappingBenchmark.akkaMapFoldAsync    ss 20 313,275 â–’ 8,716 ms/op 
[info] MappingBenchmark.map       ss 20 0,567 â–’ 0,175 ms/op 
[info] MappingBenchmark.monixBatchedObservables  ss 20 259,736 â–’ 5,939 ms/op 
[info] MappingBenchmark.monixMapAsyncFoldLeft   ss 20 456,310 â–’ 5,225 ms/op 
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 795,345 â–’ 5,443 ms/op 
[info] MappingBenchmark.monixMapFoldLeft    ss 20 247,172 â–’ 5,342 ms/op 
[info] MappingBenchmark.monixMapFoldLeftAsync   ss 20 478,840 â–’ 25,249 ms/op 
[info] MappingBenchmark.monixTaskGather    ss 20 6,707 â–’ 2,176 ms/op 
[info] MappingBenchmark.parMap      ss 20 1,257 â–’ 0,831 ms/op 

下面是代碼:

package benches 

import java.util.concurrent.TimeUnit 

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape} 
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source} 
import org.openjdk.jmh.annotations._ 

import scala.concurrent.{Await, Future} 
import scala.concurrent.duration.Duration 

@OutputTimeUnit(TimeUnit.MILLISECONDS) 
@BenchmarkMode(Array(Mode.SingleShotTime)) 
@Warmup(iterations = 20) 
@Measurement(iterations = 20) 
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g")) 
@Threads(1) 
class MappingBenchmark { 
    import monix.eval._ 
    import monix.reactive._ 
    import monix.execution.Scheduler.Implicits.global 

    def list: List[ClassA] = (1 to 10000).map(ClassA).toList 
    // val l = (1 to 135368).map(Offre).toList 

    // ##### SCALA ##### // 

    @Benchmark 
    def map: List[ClassB] = list.map(o => ClassB(o, o)) 

    @Benchmark 
    def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList 

    // ##### MONIX ##### // 

    @Benchmark 
    def monixTaskGather: List[ClassB] = { 
    val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o)))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixBatchedObservables: List[ClassB] = { 
    val task: Task[List[ClassB]] = 
     Observable.fromIterable(list) 
     .bufferIntrospective(256) 
     .flatMap{items => 
      val tasks = items.map(o => Task(ClassB(o,o))) 
      val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b)) 
      val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten) 
      Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i)) 
     }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapFoldLeft: List[ClassB] = { 
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapFoldLeftAsync: List[ClassB] = { 
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapAsyncFoldLeft: List[ClassB] = { 
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapAsyncFoldLeftAsync: List[ClassB] = { 
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    // ##### AKKA-STREAM ##### // 

    @Benchmark 
    def akkaMapFold: List[ClassB] = { 
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapFoldAsync: List[ClassB] = { 
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapAsyncFold: List[ClassB] = { 
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapAsyncFoldAsync: List[ClassB] = { 
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaLoadBalanceMap: List[ClassB] = { 
    def graph: RunnableGraph[Future[List[ClassB]]] = { 
     val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _) 
     RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder => 
     sink => 
      import GraphDSL.Implicits._ 
      val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4)) 
      val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4)) 
      val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o)) 
      Source(list) ~> balance 
      (1 to 4).foreach{ i => 
      balance ~> mapClassB.async ~> merge 
      } 
      merge ~> sink 
      ClosedShape 
     }) 
    } 
    runAkkaGraph(graph) 
    } 

    private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = { 
    implicit val actorSystem = ActorSystem("app") 
    implicit val actorMaterializer = ActorMaterializer() 
    val eventualBs = g.run() 
    val res = Await.result(eventualBs, Duration.Inf) 
    actorSystem.terminate() 
    res 
    } 
} 

case class ClassA(a:Int) 
case class ClassB(o:ClassA, o2:ClassA) 

板凳結果變得更糟當初始集合更大。

我想知道我的錯誤是什麼。

感謝您分享您的知識!

問候謹慎

+0

我不能運行代碼ATM,但你的普通斯卡拉變種O(n)和monix /阿卡是爲O(n^2)因爲你追加到摺疊中單鏈表的結尾。 –

+0

感謝ips @ OlegPyzhcov。問題與摺疊有關。事實上,我正在追加一個名單,這實際上不是合作的方式。而不是那個,我已經改變了prepend操作符。我會爲此添加一個答案。 –

回答

0

我已經更新了代碼,並且替補席比以前真的更好。差異與List運算符有關。事實上,第一個版本是使用append而不是preprend。由於List是一個鏈表,它必須遍歷元素才能添加一個新元素。由於懶惰,我想使用_運算符,但我應該沒有。

package benches 

import java.util.concurrent.TimeUnit 

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape} 
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source} 
import org.openjdk.jmh.annotations._ 

import scala.concurrent.{Await, Future} 
import scala.concurrent.duration.Duration 
import scala.collection.immutable.Seq 

@OutputTimeUnit(TimeUnit.MILLISECONDS) 
@BenchmarkMode(Array(Mode.SingleShotTime)) 
@Warmup(iterations = 20) 
@Measurement(iterations = 20) 
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g")) 
@Threads(1) 
class MappingBenchmark { 
    import monix.eval._ 
    import monix.reactive._ 
    import monix.execution.Scheduler.Implicits.global 

    def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList 
    // val l = (1 to 135368).map(Offre).toList 

    // ##### SCALA ##### // 

    def foldClassB = (l:List[ClassB], o:ClassB) => o +: l 

    @Benchmark 
    def map: Seq[ClassB] = list.map(o => ClassB(o, o)) 

    @Benchmark 
    def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList 

    // ##### MONIX ##### // 

    @Benchmark 
    def monixTaskGather: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o)))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixBatchedObservables: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = 
     Observable.fromIterable(list) 
     .bufferIntrospective(256) 
     .flatMap{items => 
      val tasks = items.map(o => Task(ClassB(o,o))) 
      val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b)) 
      val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten) 
      Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i)) 
     }.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapFoldLeft: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapFoldLeftAsync: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapAsyncFoldLeft: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB)) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    @Benchmark 
    def monixMapAsyncFoldLeftAsync: Seq[ClassB] = { 
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l))) 
    Await.result(task.runAsync, Duration.Inf) 
    } 

    // ##### AKKA-STREAM ##### // 

    @Benchmark 
    def akkaMapFold: Seq[ClassB] = { 
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapFoldAsync: Seq[ClassB] = { 
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapSeq: Seq[ClassB] = { 
    val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapAsyncFold: Seq[ClassB] = { 
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapAsyncFoldAsync: Seq[ClassB] = { 
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaMapAsyncSeq: Seq[ClassB] = { 
    val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right) 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaLoadBalanceMap: Seq[ClassB] = { 
    def graph: RunnableGraph[Future[Seq[ClassB]]] = { 
     val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB) 
     RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder => 
     sink => 
      import GraphDSL.Implicits._ 
      val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4)) 
      val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4)) 
      val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o)) 
      Source(list) ~> balance 
      (1 to 4).foreach{ i => 
      balance ~> mapClassB.async ~> merge 
      } 
      merge ~> sink 
      ClosedShape 
     }) 
    } 
    runAkkaGraph(graph) 
    } 

    @Benchmark 
    def akkaLoadBalanceMapSeq: Seq[ClassB] = { 
    def graph: RunnableGraph[Future[Seq[ClassB]]] = { 
     val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq 
     RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder => 
     sink => 
      import GraphDSL.Implicits._ 
      val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4)) 
      val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4)) 
      val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o)) 
      Source(list) ~> balance 
      (1 to 4).foreach{ i => 
      balance ~> mapClassB.async ~> merge 
      } 
      merge ~> sink 
      ClosedShape 
     }) 
    } 
    runAkkaGraph(graph) 
    } 

    private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = { 
    implicit val actorSystem = ActorSystem("app") 
    implicit val actorMaterializer = ActorMaterializer() 
    val eventualBs = g.run() 
    val res = Await.result(eventualBs, Duration.Inf) 
    actorSystem.terminate() 
    res 
    } 
} 

case class ClassA(a:Int) 
case class ClassB(o:ClassA, o2:ClassA) 

結果與此更新類是:

[info] Benchmark         Mode Cnt Score Error Units 
[info] MappingBenchmark.akkaLoadBalanceMap   ss 20 19,052 â–’ 3,779 ms/op 
[info] MappingBenchmark.akkaLoadBalanceMapSeq   ss 20 16,115 â–’ 3,232 ms/op 
[info] MappingBenchmark.akkaMapAsyncFold    ss 20 20,862 â–’ 3,127 ms/op 
[info] MappingBenchmark.akkaMapAsyncFoldAsync   ss 20 26,994 â–’ 4,010 ms/op 
[info] MappingBenchmark.akkaMapAsyncSeq    ss 20 19,399 â–’ 7,089 ms/op 
[info] MappingBenchmark.akkaMapFold     ss 20 12,132 â–’ 4,111 ms/op 
[info] MappingBenchmark.akkaMapFoldAsync    ss 20 22,652 â–’ 3,802 ms/op 
[info] MappingBenchmark.akkaMapSeq     ss 20 10,894 â–’ 3,114 ms/op 
[info] MappingBenchmark.map       ss 20 0,625 â–’ 0,193 ms/op 
[info] MappingBenchmark.monixBatchedObservables  ss 20 9,175 â–’ 4,080 ms/op 
[info] MappingBenchmark.monixMapAsyncFoldLeft   ss 20 11,724 â–’ 4,458 ms/op 
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 14,174 â–’ 6,962 ms/op 
[info] MappingBenchmark.monixMapFoldLeft    ss 20 1,057 â–’ 0,960 ms/op 
[info] MappingBenchmark.monixMapFoldLeftAsync   ss 20 9,638 â–’ 4,910 ms/op 
[info] MappingBenchmark.monixTaskGather    ss 20 7,065 â–’ 2,428 ms/op 
[info] MappingBenchmark.parMap      ss 20 1,392 â–’ 0,923 ms/op 

它似乎仍然是更快,如果我們能與斯卡拉地圖運行流之前。

3

只是關於異步處理/並行處理的一個說明......一般情況下,當並行處理東西時,最終會產生很多CPU同步結果的開銷。

開銷實際上可能非常大,以至於它可以使您從多個並行工作的CPU內核獲得的時間增益無效。您也應該熟悉Amdahl's Law。看看這些數字:75%的平行部分只有4個處理器才能達到最大加速比。並行部分達到50%,只需2個處理器即可達到最高加速比。

而這只是理論上的限制,因爲你也有處理器之間的共享內存同步,這可能會變得非常混亂;基本上處理器針對順序執行進行了優化引入併發性問題,您需要強制使用內存屏障進行排序,這會使許多CPU優化無效。因此,您可以達到負面加速,正如您的測試中所見。所以你正在測試異步/並行映射,但測試基本上什麼都沒做,不妨用身份函數進行測試,它幾乎是一樣的東西。換句話說,你正在做的測試和結果幾乎是在實踐中沒用

作爲一個側面說明,這也是爲什麼我從不喜歡「平行集合」的想法。這個概念是有缺陷的,因爲你只能使用並行集合來處理純CPU的東西(即沒有I/O,沒有實際的異步的東西),它可以讓說,這是很好做一些計算,不同之處在於:

  1. 用於多種用途並行收集的使用率比使用單個CPU的正常運營慢和
  2. 如果你確實有CPU限制的工作,你需要用你的硬件資源最大,然後在其目前的化身「平行集合」實際上是錯誤的抽象,因爲「硬件」這幾天包括GPU的

換句話說,並行集合沒有有效地使用硬件資源,因爲它們完全忽略了GPU因爲它們缺少異步支持,所以完全不適合混合CPU-I/O任務。

我覺得有必要提到這一點,因爲人們常常認爲在他們的代碼擦一些「平行」仙塵將使其運行速度更快,但很多時候不會。

當你有I/O綁定的任務(當然與CPU綁定的任務混合在一起)時,並行性效果很好,在這種情況下,CPU開銷不太重要,因爲處理時間將由I/O。

PS:Scala集合上的簡單映射應該更快,因爲它是嚴格的,並且(取決於集合類型)它使用數組支持的緩衝區,因此不會垃圾CPU緩存。 Monix的.map與Scala的Iterable.map具有相同的開銷,換句話說,接近於零的開銷,但是它的應用程序很懶,並且引入了一些裝箱開銷,因爲JVM並沒有專門化泛型,所以我們無法擺脫它。

這該死的快但在實踐中;-)

+0

感謝這個有趣的答案。板凳恰好符合你的意思。 –

相關問題