2015-06-12 38 views
0

我有一個Spark (version 1.3.1)應用。其中,我想一個Java bean RDDJavaRDD<Message>轉換成數據幀,它與不同的,不同的數據類型(整數,字符串列表,地圖,雙人間)許多領域。scala.MatchError:在Dataframes

但是,當我執行我的代碼。

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){ 
      @Override 
      public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception { 
       SQLContext sqlContext = SparkConnection.getSqlContext(); 
       DataFrame df = sqlContext.createDataFrame(arg0, Message.class); 
       df.registerTempTable("messages"); 

我得到這個錯誤

/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms 
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1 
scala.MatchError: interface java.util.List (of class java.lang.Class) 
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193) 
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1192) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:437) 
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:465) 

回答

5

如果Message有許多不同的領域,如List和錯誤消息指向List匹配誤差比是是問題。此外,如果你看一下the source code你可以看到,List是不是在比賽。

但在源代碼中四處旁邊這也是文檔here under the Java tab非常明確提出

Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays.

你可能想,因爲它似乎切換到斯卡拉在那裏支持:

Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table.

所以解決方法是,使用Scala的或從你的JavaBean刪除List

作爲最後的手段,你可以看看SQLUserDefinedType來定義List應該如何持續下去,也許有可能一起破解它。

2

我通過更新我的星火版本從1.3.11.4.0解決了這個問題。現在,它工作文件。