2017-07-29 34 views
2

我在泊塢窗火花測試應用程序的所有火花筆記本時,Scala代碼是:造成的:java.lang.ClassCastException:人不能被轉換爲個人

val p = spark.sparkContext.textFile ("../Data/person.txt") 
val pmap = p.map (_.split (",")) 
pmap.collect() 

輸出爲: Array(Array(Barack, Obama, 53), Array(George, Bush, 68), Array(Bill, Clinton, 68))

case class Person (first_name:String,last_name: String,age:Int) 
val personRDD = pmap.map (p => Person (p(0), p(1), p(2).toInt)) 
val personDF = personRDD.toDF 
personDF.collect() 

錯誤消息是以上:

Name: org.apache.spark.SparkException 
Message: Job aborted due to stage failure: Task 1 in stage 12.0 failed 1 times, most recent failure: Lost task 1.0 in stage 12.0 (TID 17, localhost, executor driver): java.lang.ClassCastException: $line145.$read$$iw$$iw$Person cannot be cast to $line145.$read$$iw$$iw$Person 
    ................ 
Caused by: java.lang.ClassCastException: Person cannot be cast to Person 

駐f行爲,我試圖用spark-shell運行這段代碼,這段代碼正確運行。我推測上面的錯誤消息與碼頭環境有關,但不是代碼本身。 此外,我試圖表明personRDD,有:

personRDD.collect 

我得到的錯誤信息:

org.apache.spark.SparkDriverExecutionException: Execution error 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1186) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327) 
    ... 37 elided 
Caused by: java.lang.ArrayStoreException: [LPerson; 
    at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043) 
    at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1182) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

我無法找出爲什麼這個問題產生的原因。有人能給我一些線索嗎?謝謝。

+0

你如何直接使用SparkSQL將案例類應用於txt文件? –

+0

你的代碼完美地運行在我的:)可能是你在代碼中的其他地方做了一些討厭的東西。您是否將案例類放在執行代碼之外? –

+0

我用舊的代碼來測試docker環境,它正確運行在spark-shell中,但現在不是。 –

回答

1

由於cricket_007在他的評論中建議使用sqlContext,因此您應該使用sparkSQL

header

first_name,last_name,age 
Barack,Obama,53 
George,Bush,68 
Bill,Clinton,68 

,你可以做以下

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", true) 
    .load("../Data/person.txt") 

給定的輸入數據文件,以獲得dataframe作爲

+----------+---------+---+ 
|first_name|last_name|age| 
+----------+---------+---+ 
|Barack |Obama |53 | 
|George |Bush  |68 | 
|Bill  |Clinton |68 | 
+----------+---------+---+ 

schema作爲

產生
root 
|-- first_name: string (nullable = true) 
|-- last_name: string (nullable = true) 
|-- age: string (nullable = true 

您可以定義一個schema和應用schema作爲

val schema = StructType(Array(StructField("first_name", StringType, true), StructField("last_name", StringType, true), StructField("age", IntegerType, true))) 

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", true) 
    .option("inferSchema", "true") 
    .schema(schema) 
    .load("/home/anahcolus/IdeaProjects/scalaTest/src/test/resources/t1.csv") 

你應該有schema作爲

root 
|-- first_name: string (nullable = true) 
|-- last_name: string (nullable = true) 
|-- age: integer (nullable = true) 

如果您還沒有header在你的文件,然後你可以刪除header option

相關問題