UPDATE FROM二○一五年十月三十○日阿卡插播實現比單線程實現較慢
基於羅蘭庫恩Awnser:
阿卡流是使用異步消息參與者之間傳遞給 實現流處理階段。通過異步邊界傳遞數據會產生以下開銷:您的 計算似乎只需要大約160ns(源自 單線程度量),而流式傳輸解決方案每個元素需要大約1μs,其中 占主導地位消息傳遞。
另一個誤解是,說「流」意味着並行:在 代碼都計算在一個單一的演員(地圖 階段)順序運行,因此沒有任何好處,可以預計在原始 單線程解決方案。
爲了從阿卡提供的並行性收益流,你 需要有多個處理階段,每個執行的每個元素
1μs的任務,也看到了文檔。
我做了一些更改。我的代碼現在看起來像:
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)
val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)
val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
import FlowGraph.Implicits._
val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
val mergeEvents = builder.add(Merge[Int](4))
dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)
(dispatchTuple.in, mergeEvents.out)
}
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink)
// in.via(eventChef).runWith(sink)
}
}
我不知道,如果我得到的東西完全錯誤的,但仍然是我與阿卡流的實現是慢得多(現在更慢如前),但我發現了什麼是:如果我例如通過做一些分工來增加工作量,用阿卡流實施會變得更快。所以,如果我把它做對了(否則糾正我),我的例子似乎有太多的開銷。所以如果代碼需要做大量的工作,你只能從阿卡流中獲益呢?
我是比較新的兩階&阿卡 - 流。我寫了一個小測試項目,創建一些事件,直到一個計數器達到特定的數字。對於每個事件,正在計算事件的一個字段的階乘。我實施了兩次。一次使用akka-stream,一次不使用akka-stream(單線程),並比較運行時間。
我沒想到:當我創建單個事件時,兩個程序的運行時間幾乎相同。但是如果我創建了70,000,000個事件,沒有AKK流的實現要快得多。這裏是我的結果(以下數據是基於24次測量):
- 單事件,而不阿卡流:403(+ - 2)MS
與單事件阿卡流:444(+ -13)ms的
70Mio事件而不阿卡流:11778(+ -70)MS與
- 70Mio事件阿卡回籠:75424(+ - 2959)MS
所以我的問題是:發生了什麼事?爲什麼我的akka-stream實現比較慢?
這裏我的代碼:
實現與阿卡
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val sink = Sink.foreach[Int]{
v => counter += 1
oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
if(counter == SharedFunctions.maxEventCount) endAkka()
}
def endAkka() = {
val duration = new Duration(SharedFunctions.startTime, DateTime.now)
println("Time: " + duration.getMillis + " || Data: " + counter)
actorSystem.shutdown
actorSystem.awaitTermination
System.exit(-1)
}
def main(args: Array[String]) {
import scala.concurrent.ExecutionContext.Implicits.global
println("MultiThread started: " + SharedFunctions.startTime)
in.via(flow).runWith(sink).onComplete(_ => endAkka())
}
}
實現無阿卡
對象SingleThread {
def main(args: Array[String]) {
println("SingleThread started at: " + SharedFunctions.startTime)
println("0%")
val i = createEvent(0)
val duration = new Duration(SharedFunctions.startTime, DateTime.now());
println("Time: " + duration.getMillis + " || Data: " + i)
}
def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
if (count == SharedFunctions.maxEventCount) count
else {
val e = SharedFunctions.transform((randDate, name, age, myFloat))
SharedFunctions.transform2(e)
val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
}
}
def createEvent(count: Int): Int = {
createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
}
}
SharedFunctions
object SharedFunctions {
val maxEventCount = 70000000
val startTime = DateTime.now
def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
def transform2(e : Event) : Int = factorial(e.getAgeYrs)
def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue)
def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
val cProgress = calculatePercentage(fileSize, currentSize)
if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
cProgress
}
private def factorialWorker(n1: Int, n2: Int): Int = {
if (n1 == 0) n2
else factorialWorker(n1 -1, n2*n1)
}
def factorial (n : Int): Int = {
factorialWorker(n, 1)
}
}
執行事件
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public long timestampMS;
@Deprecated public CharSequence name;
@Deprecated public int ageYrs;
@Deprecated public float sizeCm;
/**
* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Event() {}
/**
* All-args constructor.
*/
public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
this.timestampMS = timestampMS;
this.name = name;
this.ageYrs = ageYrs;
this.sizeCm = sizeCm;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter. Applications should not call.
public Object get(int field$) {
switch (field$) {
case 0: return timestampMS;
case 1: return name;
case 2: return ageYrs;
case 3: return sizeCm;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader. Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, Object value$) {
switch (field$) {
case 0: timestampMS = (Long)value$; break;
case 1: name = (CharSequence)value$; break;
case 2: ageYrs = (Integer)value$; break;
case 3: sizeCm = (Float)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'timestampMS' field.
*/
public Long getTimestampMS() {
return timestampMS;
}
/**
* Sets the value of the 'timestampMS' field.
* @param value the value to set.
*/
public void setTimestampMS(Long value) {
this.timestampMS = value;
}
/**
* Gets the value of the 'name' field.
*/
public CharSequence getName() {
return name;
}
/**
* Sets the value of the 'name' field.
* @param value the value to set.
*/
public void setName(CharSequence value) {
this.name = value;
}
/**
* Gets the value of the 'ageYrs' field.
*/
public Integer getAgeYrs() {
return ageYrs;
}
/**
* Sets the value of the 'ageYrs' field.
* @param value the value to set.
*/
public void setAgeYrs(Integer value) {
this.ageYrs = value;
}
/**
* Gets the value of the 'sizeCm' field.
*/
public Float getSizeCm() {
return sizeCm;
}
/**
* Sets the value of the 'sizeCm' field.
* @param value the value to set.
*/
public void setSizeCm(Float value) {
this.sizeCm = value;
}
/** Creates a new Event RecordBuilder */
public static Event.Builder newBuilder() {
return new Event.Builder();
}
/** Creates a new Event RecordBuilder by copying an existing Builder */
public static Event.Builder newBuilder(Event.Builder other) {
return new Event.Builder(other);
}
/** Creates a new Event RecordBuilder by copying an existing Event instance */
public static Event.Builder newBuilder(Event other) {
return new Event.Builder(other);
}
/**
* RecordBuilder for Event instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
implements org.apache.avro.data.RecordBuilder<Event> {
private long timestampMS;
private CharSequence name;
private int ageYrs;
private float sizeCm;
/** Creates a new Builder */
private Builder() {
super(Event.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(Event.Builder other) {
super(other);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Creates a Builder by copying an existing Event instance */
private Builder(Event other) {
super(Event.SCHEMA$);
if (isValidValue(fields()[0], other.timestampMS)) {
this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.name)) {
this.name = data().deepCopy(fields()[1].schema(), other.name);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.ageYrs)) {
this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.sizeCm)) {
this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
fieldSetFlags()[3] = true;
}
}
/** Gets the value of the 'timestampMS' field */
public Long getTimestampMS() {
return timestampMS;
}
/** Sets the value of the 'timestampMS' field */
public Event.Builder setTimestampMS(long value) {
validate(fields()[0], value);
this.timestampMS = value;
fieldSetFlags()[0] = true;
return this;
}
/** Checks whether the 'timestampMS' field has been set */
public boolean hasTimestampMS() {
return fieldSetFlags()[0];
}
/** Clears the value of the 'timestampMS' field */
public Event.Builder clearTimestampMS() {
fieldSetFlags()[0] = false;
return this;
}
/** Gets the value of the 'name' field */
public CharSequence getName() {
return name;
}
/** Sets the value of the 'name' field */
public Event.Builder setName(CharSequence value) {
validate(fields()[1], value);
this.name = value;
fieldSetFlags()[1] = true;
return this;
}
/** Checks whether the 'name' field has been set */
public boolean hasName() {
return fieldSetFlags()[1];
}
/** Clears the value of the 'name' field */
public Event.Builder clearName() {
name = null;
fieldSetFlags()[1] = false;
return this;
}
/** Gets the value of the 'ageYrs' field */
public Integer getAgeYrs() {
return ageYrs;
}
/** Sets the value of the 'ageYrs' field */
public Event.Builder setAgeYrs(int value) {
validate(fields()[2], value);
this.ageYrs = value;
fieldSetFlags()[2] = true;
return this;
}
/** Checks whether the 'ageYrs' field has been set */
public boolean hasAgeYrs() {
return fieldSetFlags()[2];
}
/** Clears the value of the 'ageYrs' field */
public Event.Builder clearAgeYrs() {
fieldSetFlags()[2] = false;
return this;
}
/** Gets the value of the 'sizeCm' field */
public Float getSizeCm() {
return sizeCm;
}
/** Sets the value of the 'sizeCm' field */
public Event.Builder setSizeCm(float value) {
validate(fields()[3], value);
this.sizeCm = value;
fieldSetFlags()[3] = true;
return this;
}
/** Checks whether the 'sizeCm' field has been set */
public boolean hasSizeCm() {
return fieldSetFlags()[3];
}
/** Clears the value of the 'sizeCm' field */
public Event.Builder clearSizeCm() {
fieldSetFlags()[3] = false;
return this;
}
@Override
public Event build() {
try {
Event record = new Event();
record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
}
只是爲了完整性,您能否給出Event定義。我想嘗試優化您的多線程代碼... –
確定我已經添加了它 –