2014-06-25 91 views
4

我正在編寫一個獨立的Spark程序,從Cassandra獲取其數據。 我遵循這些示例,並通過newAPIHadoopRDD()和ColumnFamilyInputFormat類創建了RDD。 的RDD被創建,但我得到一個NotSerializableException當我打電話的RDD的.groupByKey()方法:與Cassandra行爲的Apache Spark

public static void main(String[] args) { 
    SparkConf sparkConf = new SparkConf(); 
    sparkConf.setMaster("local").setAppName("Test"); 
    JavaSparkContext ctx = new JavaSparkContext(sparkConf); 

    Job job = new Job(); 
    Configuration jobConf = job.getConfiguration(); 
    job.setInputFormatClass(ColumnFamilyInputFormat.class); 

    ConfigHelper.setInputInitialAddress(jobConf, host); 
    ConfigHelper.setInputRpcPort(jobConf, port); 
    ConfigHelper.setOutputInitialAddress(jobConf, host); 
    ConfigHelper.setOutputRpcPort(jobConf, port); 
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true); 
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner"); 
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner"); 

    SlicePredicate predicate = new SlicePredicate(); 
    SliceRange sliceRange = new SliceRange(); 
    sliceRange.setFinish(new byte[0]); 
    sliceRange.setStart(new byte[0]); 
    predicate.setSlice_range(sliceRange); 
    ConfigHelper.setInputSlicePredicate(jobConf, predicate); 

    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd = 
    spark.newAPIHadoopRDD(jobConf, 
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), 
    ByteBuffer.class, SortedMap.class); 

    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey(); 
    System.out.println(groupRdd.count()); 
} 

例外:

java.io.NotSerializableException:java.nio.HeapByteBuffer 在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java .io.ObjectOutputStream.writeOrdinaryO bject(ObjectOutputStream.java:1400) 在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) 在java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) 在org.apache.spark.serializer。 JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply( ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply(ShuffleMapTask.scala:158) at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spar kscheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task。在java中使用java.util.concurrent.ThreadPoolExecutor $ Worker.runTask(ThreadPoolExecutor.java:895) 可以在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:187) util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:918) 在java.lang.Thread.run(Thread.java:662)

我所試圖做的是合併所有行鍵列轉換爲單個條目。 我也得到了同樣的異常,當我嘗試使用reduceByKey()方法,像這樣:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() { 
     public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0, 
      SortedMap<ByteBuffer, IColumn> arg1) throws Exception { 
      SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator()); 
      sortedMap.putAll(arg0); 
      sortedMap.putAll(arg1); 
      return sortedMap; 
     } 
    } 
); 

我使用:

  • 火花1.0.0彬hadoop1
  • 卡桑德拉1.2.12
  • 的Java 1.6

有誰知道問題是什麼? 那裏是什麼,序列化失敗?

感謝,
夏嘉曦

回答

4

你的問題是由試圖連載的ByteBuffers可能引起的。它們不可串行化,並且在生成RDD之前需要將它們轉換爲字節數組。

你應該嘗試星火它可here

+0

亞切克嗨官方DataStax卡桑德拉司機,謝謝您的答覆。我的解決方案就是這樣。 – user3770713