2016-12-30 124 views
2

我是Spark和Hive的新手,我的目標是將分隔符(讓我們說csv)加載到Hive表。經過一番閱讀後,我發現將數據加載到Hive的路徑是csv->dataframe->Hive(如果我錯了,請糾正我)。RDD [數組[String]]到Dataframe

CSV: 
1,Alex,70000,Columbus 
2,Ryan,80000,New York 
3,Johny,90000,Banglore 
4,Cook, 65000,Glasgow 
5,Starc, 70000,Aus 

我讀的CSV文件中使用以下命令:

val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim)) 
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39 

現在,我想這RDD到數據幀轉換和使用下面的代碼:

scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF() 
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string] 

員工的情況下,類,我用它作爲模式定義。

case class employee(eid: String, name: String, salary: String, destination: String) 

然而,當我做df.show我得到以下錯誤:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10.0 (TID 22, user.hostname): scala.MatchError: [Ljava.lang.String;@88ba3cb (of class [Ljava.lang.String;)

我期待一個數據幀作爲輸出。我知道爲什麼我可能會收到此錯誤,因爲RDD中的值存儲在Ljava.lang.String;@88ba3cb格式中,我需要使用mkString來獲取實際值,但我無法找到如何執行此操作。我感謝你的時間。

+1

也許您可以與我們分享您的員工是如何定義的? – Psidom

+0

opps對不起,員工是案例類'case class employee(eid:String,name:String,salary:String,destination:String)' – Explorer

+2

您將所有內容解析爲'String',因此您的'eid:Int'不會工作 –

回答

1

如果你解決你的情況類,那麼它應該工作:

scala> case class employee(eid: String, name: String, salary: String, destination: String) 
defined class employee 

scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim)) 
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24 

scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show 
+---+-----+------+-----------+ 
|eid| name|salary|destination| 
+---+-----+------+-----------+ 
| 1| Alex| 70000| Columbus| 
| 2| Ryan| 80000| New York| 
| 3|Johny| 90000| Banglore| 
| 4| Cook| 65000| Glasgow| 
| 5|Starc| 70000|  Aus| 
+---+-----+------+-----------+ 

否則,你可以轉換StringInt

scala> case class employee(eid: Int, name: String, salary: String, destination: String) 
defined class employee 

scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF 
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields] 

scala> df.show 
+---+-----+------+-----------+ 
|eid| name|salary|destination| 
+---+-----+------+-----------+ 
| 1| Alex| 70000| Columbus| 
| 2| Ryan| 80000| New York| 
| 3|Johny| 90000| Banglore| 
| 4| Cook| 65000| Glasgow| 
| 5|Starc| 70000|  Aus| 
+---+-----+------+-----------+ 

然而,最好的解決辦法是使用spark-csv(其同樣將工資視爲Int)。

另請注意,當您運行df.show時會引發錯誤,因爲在此之前,所有內容都會被延遲評估。 df.show是一個會導致所有排隊轉換執行的動作(有關更多信息,請參閱this article)。對數組元素,而不是數組

+0

謝謝@ evan058的回覆。我試過你的解決方案,但我仍然得到相同的錯誤。 'scala> case class employee(eid:String,name:String,salary:String,destination:String) defined class employee' 'scala> val csv = sc.textFile(「employee_data.txt」)。map line => line.split(「,」)。map(elem => elem.trim)) csv:org.apache.spark.rdd.RDD [Array [String]] = MapPartitionsRDD [19] at : 31' 'scala> val df = csv.map {case Array(s0,s1,s2,s3)=>員工(s0,s1,s2,s3)}。toDF() df:.. spark.sql .DataFrame = [eid:string,name:string,salary:string,destination:string' – Explorer

+0

'scala> df.show' '16/12/30 15:37:25 WARN scheduler.TaskSetManager:Lost task 0.0 in階段5.0(TID 11,主機名):scala.MatchError:[Ljava.l ang.String; @ 3297e00f(類[Ljava.lang.String;) at $ line46。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ anonfun $ 1.apply(:35) at $ line46。$ read $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ anonfun $ 1.apply(:35) at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328) at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala :328)' – Explorer

+0

@Novice嘗試運行'csv.foreach {println}'。這是什麼樣子? –

1

正如您在評論中所說的,您的案例班級員工(應該命名爲Employee)收到Int作爲其構造函數的第一個參數,但您傳遞的是String。因此,在實例化或修改您的案例之前,您應將其轉換爲Int,然後將其定義爲String

2

使用地圖:

val csv = sc.textFile("employee_data.txt") 
    .map(line => line 
        .split(",") 
        .map(e => e.map(_.trim)) 
    ) 
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF() 

但是,爲什麼你正在閱讀CSV,然後轉換到RDD DF? Spark 1.5已經可以通過spark-csv包裝讀取CSV:

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", ";") 
    .load("employee_data.txt") 
+0

感謝Gaweda給你回覆。我可以使用,但我有一個要求,我可以用逗號分隔文件。所以我不能使用'spark-csv' – Explorer

+2

@Novice有參數'delimiter' - 你可以將它設置爲';'或其他 –

+0

哦,我不知道這一點。感謝讓我搜索更多。 – Explorer

相關問題