我有帶字符串REC的文本文件作爲記錄分隔符和換行符作爲列分隔符,並且每個數據都使用逗號作爲分隔符來附加列名稱,下面是示例數據格式從自定義數據格式創建火花數據框
REC
標識,19048
期限,牛奶
評級,1個
REC
標識,19049
期限,玉米
評級,5
用REC作爲記錄分隔符。現在,我想創建列名稱爲ID,Term和Rank的火花數據框。請幫助我。
我有帶字符串REC的文本文件作爲記錄分隔符和換行符作爲列分隔符,並且每個數據都使用逗號作爲分隔符來附加列名稱,下面是示例數據格式從自定義數據格式創建火花數據框
REC
標識,19048
期限,牛奶
評級,1個
REC
標識,19049
期限,玉米
評級,5
用REC作爲記錄分隔符。現在,我想創建列名稱爲ID,Term和Rank的火花數據框。請幫助我。
這裏是工作的代碼
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object RecordSeparator extends App {
var conf = new
SparkConf().setAppName("test").setMaster("local[1]")
.setExecutorEnv("executor- cores", "2")
var sc = new SparkContext(conf)
val hconf = new Configuration
hconf.set("textinputformat.record.delimiter", "REC")
val data = sc.newAPIHadoopFile("data.txt",
classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
.map(x => getRecord(x)).map(x => x.split(","))
.map(x => record(x(0), x(2), x(2)))
val sqlContext = new SQLContext(sc)
val df = data.toDF()
df.printSchema()
df.show(false)
def getRecord(in: String): String = {
val ar = in.split("\n").mkString(",").split(",")
val data = Array(ar(1), ar(3), ar(5))
data.mkString(",")
}
}
case class record(Id: String, Term: String, Rank: String)
輸出:
root
|-- Id: string (nullable = true)
|-- Term: string (nullable = true)
|-- Rank: string (nullable = true)
+-----+----+----+
|Id |Term|Rank|
+-----+----+----+
|19048|1 |1 |
|19049|5 |5 |
+-----+----+----+
假如你對 「正常」 的文件系統(未HDFS)的文件,你必須寫一個文件分析器和然後用sc.parallelize
創建一個RDD
然後一個DataFrame
:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Demo extends App {
val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
case class Record(
var id:Option[Int] = None,
var term:Option[String] = None,
var rank:Option[Int] = None)
val filename = "data.dat"
val records = readFile(filename)
val df = sc.parallelize(records).toDF
df.printSchema()
df.show()
def readFile(filename:String) : Seq[Record] = {
import scala.io.Source
val records = mutable.ArrayBuffer.empty[Record]
var currentRecord: Record = null
for (line <- Source.fromFile(filename).getLines) {
val tokens = line.split(',')
currentRecord = tokens match {
case Array("REC") => Record()
case Array("Id", id) => {
currentRecord.id = Some(id.toInt); currentRecord
}
case Array("Term", term) => {
currentRecord.term = Some(term); currentRecord
}
case Array("Rank", rank) => {
currentRecord.rank = Some(rank.toInt); records += currentRecord;
null
}
}
}
return records
}
}
這給
root
|-- id: integer (nullable = true)
|-- term: string (nullable = true)
|-- rank: integer (nullable = true)
+-----+----+----+
| id|term|rank|
+-----+----+----+
|19048|milk| 1|
|19049|corn| 5|
+-----+----+----+
** **縮進你的代碼。 – gsamaras