2015-10-29 63 views
9

UPDATE FROM二○一五年十月三十○日阿卡插播實現比單線程實現較慢


基於羅蘭庫恩Awnser:

阿卡流是使用異步消息參與者之間傳遞給 實現流處理階段。通過異步邊界傳遞數據會產生以下開銷:您的 計算似乎只需要大約160ns(源自 單線程度量),而流式傳輸解決方案每個元素需要大約1μs,其中 占主導地位消息傳遞。

另一個誤解是,說「流」意味着並行:在 代碼都計算在一個單一的演員(地圖 階段)順序運行,因此沒有任何好處,可以預計在原始 單線程解決方案。

爲了從阿卡提供的並行性收益流,你 需要有多個處理階段,每個執行的每個元素

1μs的任務,也看到了文檔。

我做了一些更改。我的代碼現在看起來像:

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform) 

    val eventToFactorial = Flow[Event].map(SharedFunctions.transform2) 

    val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder => 
    import FlowGraph.Implicits._ 

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4)) 
    val mergeEvents = builder.add(Merge[Int](4)) 

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0) 
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1) 
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2) 
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3) 

    (dispatchTuple.in, mergeEvents.out) 
    } 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink) 
    // in.via(eventChef).runWith(sink) 
    } 

} 

我不知道,如果我得到的東西完全錯誤的,但仍然是我與阿卡流的實現是慢得多(現在更慢如前),但我發現了什麼是:如果我例如通過做一些分工來增加工作量,用阿卡流實施會變得更快。所以,如果我把它做對了(否則糾正我),我的例子似乎有太多的開銷。所以如果代碼需要做大量的工作,你只能從阿卡流中獲益呢?




我是比較新的兩階&阿卡 - 流。我寫了一個小測試項目,創建一些事件,直到一個計數器達到特定的數字。對於每個事件,正在計算事件的一個字段的階乘。我實施了兩次。一次使用akka-stream,一次不使用akka-stream(單線程),並比較運行時間。

我沒想到:當我創建單個事件時,兩個程序的運行時間幾乎相同。但是如果我創建了70,000,000個事件,沒有AKK流的實現要快得多。這裏是我的結果(以下數據是基於24次測量):


  • 單事件,而不阿卡流403(+ - 2)MS
  • 與單事件阿卡流444(+ -13)ms的


  • 70Mio事件而不阿卡流11778(+ -70)MS

  • 70Mio事件阿卡回籠75424(+ - 2959)MS

所以我的問題是:發生了什麼事?爲什麼我的akka​​-stream實現比較慢?

這裏我的代碼:

實現與阿卡

object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 
    } 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 
    actorSystem.shutdown 
    actorSystem.awaitTermination 
    System.exit(-1) 
    } 

    def main(args: Array[String]) { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink).onComplete(_ => endAkka()) 
    } 

} 

實現無阿卡

對象SingleThread {

def main(args: Array[String]) { 
    println("SingleThread started at: " + SharedFunctions.startTime) 
    println("0%") 
    val i = createEvent(0) 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now()); 
    println("Time: " + duration.getMillis + " || Data: " + i) 
    } 

    def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = { 
    if (count == SharedFunctions.maxEventCount) count 
    else { 
     val e = SharedFunctions.transform((randDate, name, age, myFloat)) 
     SharedFunctions.transform2(e) 
     val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, 
     DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
     createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f) 
    } 
    } 

    def createEvent(count: Int): Int = { 
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f) 
    } 
} 

SharedFunctions

object SharedFunctions { 
    val maxEventCount = 70000000 
    val startTime = DateTime.now 

    def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4) 
    def transform2(e : Event) : Int = factorial(e.getAgeYrs) 

    def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue) 
    def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = { 
    val cProgress = calculatePercentage(fileSize, currentSize) 
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms") 
    cProgress 
    } 

    private def factorialWorker(n1: Int, n2: Int): Int = { 
    if (n1 == 0) n2 
    else factorialWorker(n1 -1, n2*n1) 
    } 
    def factorial (n : Int): Int = { 
    factorialWorker(n, 1) 
    } 
} 

執行事件

/** 
* Autogenerated by Avro 
* 
* DO NOT EDIT DIRECTLY 
*/ 

@SuppressWarnings("all") 
@org.apache.avro.specific.AvroGenerated 
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { 
    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}"); 
    public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } 
    @Deprecated public long timestampMS; 
    @Deprecated public CharSequence name; 
    @Deprecated public int ageYrs; 
    @Deprecated public float sizeCm; 

    /** 
    * Default constructor. Note that this does not initialize fields 
    * to their default values from the schema. If that is desired then 
    * one should use <code>newBuilder()</code>. 
    */ 
    public Event() {} 

    /** 
    * All-args constructor. 
    */ 
    public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) { 
    this.timestampMS = timestampMS; 
    this.name = name; 
    this.ageYrs = ageYrs; 
    this.sizeCm = sizeCm; 
    } 

    public org.apache.avro.Schema getSchema() { return SCHEMA$; } 
    // Used by DatumWriter. Applications should not call. 
    public Object get(int field$) { 
    switch (field$) { 
    case 0: return timestampMS; 
    case 1: return name; 
    case 2: return ageYrs; 
    case 3: return sizeCm; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 
    // Used by DatumReader. Applications should not call. 
    @SuppressWarnings(value="unchecked") 
    public void put(int field$, Object value$) { 
    switch (field$) { 
    case 0: timestampMS = (Long)value$; break; 
    case 1: name = (CharSequence)value$; break; 
    case 2: ageYrs = (Integer)value$; break; 
    case 3: sizeCm = (Float)value$; break; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    } 
    } 

    /** 
    * Gets the value of the 'timestampMS' field. 
    */ 
    public Long getTimestampMS() { 
    return timestampMS; 
    } 

    /** 
    * Sets the value of the 'timestampMS' field. 
    * @param value the value to set. 
    */ 
    public void setTimestampMS(Long value) { 
    this.timestampMS = value; 
    } 

    /** 
    * Gets the value of the 'name' field. 
    */ 
    public CharSequence getName() { 
    return name; 
    } 

    /** 
    * Sets the value of the 'name' field. 
    * @param value the value to set. 
    */ 
    public void setName(CharSequence value) { 
    this.name = value; 
    } 

    /** 
    * Gets the value of the 'ageYrs' field. 
    */ 
    public Integer getAgeYrs() { 
    return ageYrs; 
    } 

    /** 
    * Sets the value of the 'ageYrs' field. 
    * @param value the value to set. 
    */ 
    public void setAgeYrs(Integer value) { 
    this.ageYrs = value; 
    } 

    /** 
    * Gets the value of the 'sizeCm' field. 
    */ 
    public Float getSizeCm() { 
    return sizeCm; 
    } 

    /** 
    * Sets the value of the 'sizeCm' field. 
    * @param value the value to set. 
    */ 
    public void setSizeCm(Float value) { 
    this.sizeCm = value; 
    } 

    /** Creates a new Event RecordBuilder */ 
    public static Event.Builder newBuilder() { 
    return new Event.Builder(); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Builder */ 
    public static Event.Builder newBuilder(Event.Builder other) { 
    return new Event.Builder(other); 
    } 

    /** Creates a new Event RecordBuilder by copying an existing Event instance */ 
    public static Event.Builder newBuilder(Event other) { 
    return new Event.Builder(other); 
    } 

    /** 
    * RecordBuilder for Event instances. 
    */ 
    public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event> 
    implements org.apache.avro.data.RecordBuilder<Event> { 

    private long timestampMS; 
    private CharSequence name; 
    private int ageYrs; 
    private float sizeCm; 

    /** Creates a new Builder */ 
    private Builder() { 
     super(Event.SCHEMA$); 
    } 

    /** Creates a Builder by copying an existing Builder */ 
    private Builder(Event.Builder other) { 
     super(other); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Creates a Builder by copying an existing Event instance */ 
    private Builder(Event other) { 
      super(Event.SCHEMA$); 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     } 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     } 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     } 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 
     } 
    } 

    /** Gets the value of the 'timestampMS' field */ 
    public Long getTimestampMS() { 
     return timestampMS; 
    } 

    /** Sets the value of the 'timestampMS' field */ 
    public Event.Builder setTimestampMS(long value) { 
     validate(fields()[0], value); 
     this.timestampMS = value; 
     fieldSetFlags()[0] = true; 
     return this; 
    } 

    /** Checks whether the 'timestampMS' field has been set */ 
    public boolean hasTimestampMS() { 
     return fieldSetFlags()[0]; 
    } 

    /** Clears the value of the 'timestampMS' field */ 
    public Event.Builder clearTimestampMS() { 
     fieldSetFlags()[0] = false; 
     return this; 
    } 

    /** Gets the value of the 'name' field */ 
    public CharSequence getName() { 
     return name; 
    } 

    /** Sets the value of the 'name' field */ 
    public Event.Builder setName(CharSequence value) { 
     validate(fields()[1], value); 
     this.name = value; 
     fieldSetFlags()[1] = true; 
     return this; 
    } 

    /** Checks whether the 'name' field has been set */ 
    public boolean hasName() { 
     return fieldSetFlags()[1]; 
    } 

    /** Clears the value of the 'name' field */ 
    public Event.Builder clearName() { 
     name = null; 
     fieldSetFlags()[1] = false; 
     return this; 
    } 

    /** Gets the value of the 'ageYrs' field */ 
    public Integer getAgeYrs() { 
     return ageYrs; 
    } 

    /** Sets the value of the 'ageYrs' field */ 
    public Event.Builder setAgeYrs(int value) { 
     validate(fields()[2], value); 
     this.ageYrs = value; 
     fieldSetFlags()[2] = true; 
     return this; 
    } 

    /** Checks whether the 'ageYrs' field has been set */ 
    public boolean hasAgeYrs() { 
     return fieldSetFlags()[2]; 
    } 

    /** Clears the value of the 'ageYrs' field */ 
    public Event.Builder clearAgeYrs() { 
     fieldSetFlags()[2] = false; 
     return this; 
    } 

    /** Gets the value of the 'sizeCm' field */ 
    public Float getSizeCm() { 
     return sizeCm; 
    } 

    /** Sets the value of the 'sizeCm' field */ 
    public Event.Builder setSizeCm(float value) { 
     validate(fields()[3], value); 
     this.sizeCm = value; 
     fieldSetFlags()[3] = true; 
     return this; 
    } 

    /** Checks whether the 'sizeCm' field has been set */ 
    public boolean hasSizeCm() { 
     return fieldSetFlags()[3]; 
    } 

    /** Clears the value of the 'sizeCm' field */ 
    public Event.Builder clearSizeCm() { 
     fieldSetFlags()[3] = false; 
     return this; 
    } 

    @Override 
    public Event build() { 
     try { 
     Event record = new Event(); 
     record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]); 
     record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]); 
     record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]); 
     record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]); 
     return record; 
     } catch (Exception e) { 
     throw new org.apache.avro.AvroRuntimeException(e); 
     } 
    } 
    } 
} 
+0

只是爲了完整性,您能否給出Event定義。我想嘗試優化您的多線程代碼... –

+0

確定我已經添加了它 –

回答

11

除了Roland的解釋,我完全同意,應該理解,akka Streams不僅僅是一個併發編程框架。流也提供背壓,這意味着事件僅在Source產生,當需要在Sink中處理它們時。這種需求傳遞在每個處理步驟中增加了一些開銷。

因此,您的單線程和多線程比較不是「蘋果到蘋果」。

如果您想要原始的多線程執行性能,那麼Futures/Actors是更好的選擇。

+0

是的。流的良好用例涉及實時流式傳輸或大量數據的流式傳輸。解析巨大的視頻文件時,無需將所有內容全部讀入內存,通過互聯網連接獲取數百萬條記錄,或者在解析之前使用文件觀察器等待文件丟失是很好的例子。它們要麼限制最終速度不是必需的環境中的複雜性,要​​麼是不明智的(數據進入較長時間或需要數百萬次網絡調用),或者在有大量數據的情況下幫助緩解複雜性。 –

27

阿卡流是使用異步消息參與者之間傳遞,以實現流處理階段。跨越異步邊界傳遞數據會產生一些開銷,您可以在這裏看到:您的計算似乎只需要大約160ns(來自單線程測量),而流式傳輸解決方案每個元素需要大約1μs,這主要由消息傳遞決定。

另一個誤解是說「流」意味着並行性:在您的代碼中,所有計算在單個Actor(map階段)中按順序運行,因此對於原始單線程解決方案沒有任何好處。

爲了受益於Akka Streams提供的並行性,您需要有多個處理階段,每個處理階段每個元素執行1μs以上的任務,另請參閱the docs

+5

不知道爲什麼這不是被接受的答案 - 它早於接受的答案,實際上與它一致,只是次要的次要點回答。 – doug