我啓用了我的Spark作業的Kryo序列化,啓用了需要註冊的設置,並確保我的所有類型都已註冊。爲什麼Spark在使用Kryo序列化時表現更差?
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
該作業的掛鐘時間表現惡化了約20%,並且洗牌的字節數增加了近400%。
考慮到Spark documentation的建議,Kryo應該更好,這對我來說似乎真的很讓人驚訝。
KRYO是顯著比Java序列化(通常高達10倍),更快,更緊湊
我手動與我的數據的一個實例調用serialize
方法上斯巴克的org.apache.spark.serializer.KryoSerializer
和org.apache.spark.serializer.JavaSerializer
的實例。結果與Spark文檔中的建議一致:Kryo生成了98個字節; Java產生了993個字節。這真的是一個10倍的改善。
一個可能混淆的因素是被序列化和洗牌的對象實現Avro GenericRecord
接口。我嘗試在SparkConf
中註冊Avro模式,但沒有顯示出改進。
我試着製作新類來洗牌簡單的Scala case class
es的數據,不包括任何Avro機器。它沒有改善洗牌性能或交換的字節數。
星火代碼最終熬煮到以下幾點:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
你有任何想法可能什麼或如何我能得到更好的性能,應該是從KRYO?
你能張貼例子案例類和工作嗎?回答這個問題要容易得多,然後 –
好點,@T.Gawęd。用簡化代碼更新。 –
你是如何測量你的代碼的? –