我已經爲一個Mahout作業的Spark版本創建了一個名爲「item similarity」的CLI驅動程序,其中有幾個測試都可以在本地[4] Spark standalone上正常工作。該代碼甚至可以讀取和寫入集羣HDFS。但切換到羣集Spark有一個似乎與廣播和/或序列化相關的問題。Spark廣播/序列化錯誤
該代碼使用HashBiMap,這是一個Guava Java的東西。爲每個Mahout drm(一個分佈式矩陣)創建了兩個,用於雙向行和列ID查找。它們被創建一次,然後廣播到處訪問。
當我在羣集Spark上運行這個時,我得到以下錯誤。有一次我們使用HashMaps,他們似乎在羣集上工作。所以我懷疑有關HashBiMap的一些問題。我也懷疑它可能與廣播中的序列化有關。這是一段代碼和錯誤。
// create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID
// broadcast them for access in distributed processes, so they are not recalculated in every task.
// rowIDDictionary is a HashBiMap[String, Int]
val rowIDDictionary = asOrderedDictionary(rowIDs) // this creates the HashBiMap in a non-dsitributed manner
val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
val columnIDDictionary = asOrderedDictionary(columnIDs)) // this creates the HashBiMap in a non-dsitributed manner
val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
val indexedInteractions =
interactions.map { case (rowID, columnID) => //<<<<<<<<<<< this is the stage being submitted before the error
val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
val columnIndex = columnIDDictionary_bcast.value.get(columnID).get
rowIndex -> columnIndex
}
錯誤似乎發生在訪問_bcast vals時執行interact.map時。任何想法從哪裏開始尋找這個?
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83), which has no missing parents
14/06/26 11:23:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 9 (MappedRDD[17] at map at TextDelimitedReaderWriter.scala:83)
14/06/26 11:23:36 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 2 tasks
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:0 as TID 16 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:0 as 2418 bytes in 0 ms
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Starting task 9.0:1 as TID 17 on executor 0: occam4 (PROCESS_LOCAL)
14/06/26 11:23:36 INFO scheduler.TaskSetManager: Serialized task 9.0:1 as 2440 bytes in 0 ms
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Lost TID 16 (task 9.0:0)
14/06/26 11:23:36 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:102)
at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:69)
at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:138)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1814)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:349)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
是的,使用Kryo無處不在。不知道爲什麼,這個決定是由項目中的其他人做出的(不在現在)。我對Spark很新,所以這有點超出我的深度。 Guava HashBiMap擴展了「可序列化」,是否必須註冊到Kryo?猜猜我更好地研究Kryo。 – pferrel
看起來我需要註冊HashBiMap並使用Kryo的JavaSerializer。它然後工作。似乎很奇怪,我需要註冊我想要廣播的所有內容,即使它擴展了Serializable。 – pferrel
所以Kryo序列化是一個獨立於JavaSerialization的serilization機制。很高興註冊它解決了問題:) – Holden