2017-01-09 67 views
14

我啓用了我的Spark作業的Kryo序列化,啓用了需要註冊的設置,並確保我的所有類型都已註冊。爲什麼Spark在使用Kryo序列化時表現更差?

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrationRequired", "true") 
conf.registerKryoClasses(classes) 
conf.registerAvroSchemas(avroSchemas: _*) 

該作業的掛鐘時間表現惡化了約20%,並且洗牌的字節數增加了近400%。

考慮到Spark documentation的建議,Kryo應該更好,這對我來說似乎真的很讓人驚訝。

KRYO是顯著比Java序列化(通常高達10倍),更快,更緊湊

我手動與我的數據的一個實例調用serialize方法上斯巴克的org.apache.spark.serializer.KryoSerializerorg.apache.spark.serializer.JavaSerializer的實例。結果與Spark文檔中的建議一致:Kryo生成了98個字節; Java產生了993個字節。這真的是一個10倍的改善。

一個可能混淆的因素是被序列化和洗牌的對象實現Avro GenericRecord接口。我嘗試在SparkConf中註冊Avro模式,但沒有顯示出改進。

我試着製作新類來洗牌簡單的Scala case class es的數據,不包括任何Avro機器。它沒有改善洗牌性能或交換的字節數。

星火代碼最終熬煮到以下幾點:

case class A(
    f1: Long, 
    f2: Option[Long], 
    f3: Int, 
    f4: Int, 
    f5: Option[String], 
    f6: Option[Int], 
    f7: Option[String], 
    f8: Option[Int], 
    f9: Option[Int], 
    f10: Option[Int], 
    f11: Option[Int], 
    f12: String, 
    f13: Option[Double], 
    f14: Option[Int], 
    f15: Option[Double], 
    f16: Option[Double], 
    f17: List[String], 
    f18: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(f: Int) : AnyRef = ??? 
    def put(f: Int, value: Any) : Unit = ??? 
    def getSchema(): org.apache.avro.Schema = A.SCHEMA$ 
} 
object A extends AnyRef with Serializable { 
    val SCHEMA$: org.apache.avro.Schema = ??? 
} 

case class B(
    f1: Long 
    f2: Long 
    f3: String 
    f4: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(field$ : Int) : AnyRef = ??? 
    def getSchema() : org.apache.avro.Schema = B.SCHEMA$ 
    def put(field$ : Int, value : Any) : Unit = ??? 
} 
object B extends AnyRef with Serializable { 
    val SCHEMA$ : org.apache.avro.Schema = ??? 
} 

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = { 
    val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b) 
    joined.map { case (_, asAndBs) => asAndBs } 
} 

你有任何想法可能什麼或如何我能得到更好的性能,應該是從KRYO?

+0

你能張貼例子案例類和工作嗎?回答這個問題要容易得多,然後 –

+0

好點,@T.Gawęd。用簡化代碼更新。 –

+0

你是如何測量你的代碼的? –

回答

0

由於您的基數RDD很高,廣播/廣播散列連接似乎不幸被禁止。

您最好是在加入之前coalesce()您的RDD。你在洗牌時間看到高偏斜嗎?如果是這樣,你可能想與shuffle = true合併。

最後,如果您擁有嵌套結構(例如JSON)的RDD,那麼有時您可以繞過洗牌。查看幻燈片和/或視頻here以獲取更詳細的解釋。

1

如果您的單個記錄大小太小,並且有大量記錄可能會使您的工作變慢。請嘗試增加緩衝區大小並查看是否有任何改進。

嘗試下面的一個,如果沒有這樣做..

val conf = new SparkConf() 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    // Now it's 24 Mb of buffer by default instead of 0.064 Mb 
    .set("spark.kryoserializer.buffer.mb","24") 

編號:https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

相關問題