2016-01-23 61 views
4

我試圖在Java中創建Dataset,所以我寫了下面的代碼:如何從自定義類Person創建數據集?

public Dataset createDataset(){ 
    List<Person> list = new ArrayList<>(); 
    list.add(new Person("name", 10, 10.0)); 
    Dataset<Person> dateset = sqlContext.createDataset(list, Encoders.bean(Person.class)); 
    return dataset; 
} 

Person類是一個內部類。然而

星火拋出以下異常:

org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `....` without access to the scope that this class was defined in. Try moving this class out of its parent class.; 

at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) 
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) 

如何做正確?

回答

10

TL;博士(僅在火花殼)定義您的情況下的類第一,一旦它們定義,使用它們。在Spark/Scala應用程序中使用案例類應該可行。

2.0.1在Spark shell中,您應該首先定義案例類,然後才能訪問它們以創建Dataset

$ ./bin/spark-shell --version 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT 
     /_/ 

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_102 
Branch master 
Compiled by user jacek on 2016-10-25T04:20:04Z 
Revision 483c37c581fedc64b218e294ecde1a7bb4b2af9c 
Url https://github.com/apache/spark.git 
Type --help for more information. 

$ ./bin/spark-shell 
scala> :pa 
// Entering paste mode (ctrl-D to finish) 

case class Person(id: Long) 

Seq(Person(0)).toDS // <-- this won't work 

// Exiting paste mode, now interpreting. 

<console>:15: error: value toDS is not a member of Seq[Person] 
     Seq(Person(0)).toDS // <-- it won't work 
        ^
scala> case class Person(id: Long) 
defined class Person 

scala> // the following implicit conversion *will* work 

scala> Seq(Person(0)).toDS 
res1: org.apache.spark.sql.Dataset[Person] = [id: bigint] 

43ebf7a9cbd70d6af75e140a6fc91bf0ffc2b877承諾(在3月21日星火2.0.0-SNAPSHOT)溶液中加入來解決這個問題。

在斯卡拉REPL我不得不添加OuterScopes.addOuterScope(this):paste完整的片段如下:

scala> :pa 
// Entering paste mode (ctrl-D to finish) 

import sqlContext.implicits._ 
case class Token(name: String, productId: Int, score: Double) 
val data = Token("aaa", 100, 0.12) :: 
    Token("aaa", 200, 0.29) :: 
    Token("bbb", 200, 0.53) :: 
    Token("bbb", 300, 0.42) :: Nil 
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 
val ds = data.toDS 
+0

使用[火花筆記本](http://spark-notebook.io)與scala 0.11的確,在case類定義之後並在dataframe命令中使用它之前,添加'org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)'解決了這個問題。 –

+0

我在問addOuterScope方法,如果你知道爲什麼必須添加編碼器才能正常工作 – eliasah

+0

非常感謝您的更新。我曾問過你,因爲我在http://stackoverflow.com/a/40232936/3415409之前正在研究這個問題 – eliasah

4

的解決方案是在該方法的開始添加這段代碼:

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this); 
0

對於一階類似的問題,我的解決辦法是做完全一樣的AnalysisException建議。將案例類移出其父類。 例如我在Streaming_Base.scala類似下面:

abstract class Streaming_Base { 
    case class EventBean(id:String, command:String, recordType:String) 
    ... 
} 

我改變了以下:

case class EventBean(id:String, command:String, recordType:String) 
abstract class Streaming_Base {   
    ... 
} 
相關問題