2013-10-28 75 views
2

目前,我在嘗試在獨立模式下運行Cassandra時遇到一些問題。Spark with Cassandra:無法註冊spark.kryo.registrator

最初,我用 「本地[4]」 在SparkContext參數腦膜=成功運行。

然後,我嘗試移動到獨立模式。我使用的是:

的Ubuntu:12.04 卡桑德拉:1.2.11 星火:0.8.0 斯卡拉:2.9.3 JDK:甲骨文1.6.0_35 KRYO:2.21

起初,我得到了「未讀塊」錯誤。作爲其他主題的建議,我更改爲使用Kryo序列化器並添加Twitter Chill。然後,我得到「無法註冊spark.kryo.registrator」在我的控制檯和下面的異常:

13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0) 
13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.EOFException 
java.io.EOFException 
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:109) 
    at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:150) 
    at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.readObject(Unknown Source) 
    at scala.collection.immutable.$colon$colon.readObject(List.scala:435) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source) 
    at java.io.ObjectInputStream.readSerialData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.readObject(Unknown Source) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) 
    at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) 
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129) 
    at java.io.ObjectInputStream.readExternalData(Unknown Source) 
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
    at java.io.ObjectInputStream.readObject0(Unknown Source) 
    at java.io.ObjectInputStream.readObject(Unknown Source) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

也有人在火花中遇到的EOFException類之前,答案是不正確註冊registrator。我遵循Spark指南註冊註冊人。註冊人如下:

class MyRegistrator extends KryoRegistrator { 
     override def registerClasses(kryo: Kryo) { 
      kryo.register(classOf[org.apache.spark.rdd.RDD[(Map[String, ByteBuffer], Map[String, ByteBuffer])]]) 
      kryo.register(classOf[String], 1) 
      kryo.register(classOf[Map[String, ByteBuffer]], 2) 
     } 
    } 

而且我也按照指南那樣設置了屬性。

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    System.setProperty("spark.kryo.registrator", "main.scala.MyRegistrator") 

任何人都可以給我一些提示,我做錯了嗎? 謝謝。

回答

0

根據我的經驗,原因獲得「EOFException類」和「未讀數據塊」是相同的。在羣集上運行時缺少一些庫。最有線的是我已經在spark中添加了「sbt assembly」的庫,而庫實際上存在於jar文件夾中。但火花仍然無法找到併成功加載它們。然後我將這些庫添加到spark上下文中,它可以工作。這意味着我需要通過在代碼中指定將庫傳輸到每個節點。

+0

'SparkContext.addJar()'是火花的[航運代碼到集羣優選方式](https://spark.incubator.apache.org/docs/latest/cluster-overview.html#shipping-code-to -the-羣集)。我不建議修改Spark自己的構建來添加自定義庫(如果您嘗試過這樣做並且它不起作用,您可能需要將更新後的Spark組件複製到工作機器;這很不方便,這就是爲什麼我會堅持到'addJar()')。 –

+0

我明白了,謝謝。 – cjcrobin