2017-07-07 41 views
0

我想加載一個文件到spark。 如果我加載正常TEXTFILE到火花象下面這樣:如何將模式添加到Spark中的數據集?

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

的結果是:

partFile: org.apache.spark.sql.Dataset[String] = [value: string] 

我可以看到在輸出的數據集。但是,如果我加載一個JSON文件:

val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson") 

結果是一個現成的模式的數據幀:

pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field] 

將JSON /拼花/獸人文件具有架構。所以我可以理解這是Spark版本中的一個特性:2x,這使得事情變得更加簡單,因爲在這種情況下我們直接獲取DataFrame,而對於普通textFile,您可以在沒有任何合理模式的情況下獲得數據集。 我想知道的是,如何將模式添加到作爲將文本文件加載到spark中的結果的數據集中。對於RDD,有case/StructType選項來添加模式並將其轉換爲DataFrame。 任何人都可以讓我知道我該怎麼做?

回答

4

當您使用textFile時,每行的文件將是數據集中的字符串行。要轉換到數據幀有一個模式,你可以使用toDF

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

import sqlContext.implicits._ 
val df = partFile.toDF("string_column") 

在這種情況下,數據幀將有類型StringType的一列的架構。

如果你的文件包含了更復雜的模式,您可以使用CSV閱讀器(如果該文件是在一個結構化的csv格式):

val partFile = spark.read.option("header", "true").option("delimiter", ";").csv("hdfs://quickstart:8020/user/cloudera/partfile") 

或者您可以使用地圖,然後利用處理您的數據集toDF轉換爲DataFrame。例如,假設你想一列是該行的第一個字符(作爲int)和另一列是第四個字符(也作爲int):

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[(Int, Int)] = partFile.map { 
    line: String => (line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF("value0", "value3") 

此外,可以定義一個案例類,這將代表最終的架構您的數據框:

case class MyRow(value0: Int, value3: Int) 

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[MyRow] = partFile.map { 
    line: String => MyRow(line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF 

在上述兩種情況下,調用df.printSchema會顯示:根據您的回答

root 
|-- value0: integer (nullable = true) 
|-- value3: integer (nullable = true) 
+0

,我不得不調整它一下。根據分隔符分割數據集:val partdata = partFile.map(p => p.split(「,」)) 我也必須更改此語句:val prdt = partdata.map {line => rows(line 0).toInt,line(1).toString,line(2).toInt,line(3).toString,line(4).toString)} 因爲非數字數據是'char'格式,而且我有將它們轉換爲'String'。它正在工作。 – Sidhartha

+1

@Sidhartha,很高興知道它的工作。如果它是一個逗號分隔的文件,你可以考慮我使用'spark.read.csv'的第一個建議,它可能會更簡單。 –

0

使用case class創建dataset/dataframe是很容易

可以說你有一個包含數據nameage文本文件作爲

x1,32 
x2,32 
x3,32 

你必須定義case class主要執行類外的

case class Info(name: String, 
       age: Int) 

然後r eading使用sparkContext.textFile和上述情況類的文件中,我們應該有一個dataframe

val data = sc.textFile("path to text file") 

    import sqlContext.implicits._ 
    data.map(line => line.split(",")).map(array => Info(array(0), array(1).toInt)).toDF.show(false) 

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 

使用schema是如下,在這裏你需要創建rdd[Row]schema和使用sqlContext作爲

val data = sc.textFile("path to text file") 
    .map(line=> line.split(",")).map(array => Row(array(0), array(1).toInt)) 

val schema = StructType(
    Array(
    StructField("name", StringType, true), 
    StructField("age", IntegerType, true) 
) 
) 
sqlContext.createDataFrame(data, schema).show(false) 

輸出是一樣的以上

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 
相關問題