2016-09-08 42 views
0

我有帶字符串REC的文本文件作爲記錄分隔符和換行符作爲列分隔符,並且每個數據都使用逗號作爲分隔符來附加列名稱,下面是示例數據格式從自定義數據格式創建火花數據框

REC
標識,19048
期限,牛奶
評級,1個
REC
標識,19049
期限,玉米
評級,5

用REC作爲記錄分隔符。現在,我想創建列名稱爲ID,Term和Rank的火花數據框。請幫助我。

回答

3

這裏是工作的代碼

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
import org.apache.spark.{SparkConf, SparkContext} 


object RecordSeparator extends App { 
    var conf = new 
     SparkConf().setAppName("test").setMaster("local[1]") 
    .setExecutorEnv("executor- cores", "2") 
    var sc = new SparkContext(conf) 
    val hconf = new Configuration 
    hconf.set("textinputformat.record.delimiter", "REC") 
    val data = sc.newAPIHadoopFile("data.txt", 
    classOf[TextInputFormat], classOf[LongWritable], 
    classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "") 
    .map(x => getRecord(x)).map(x => x.split(",")) 
    .map(x => record(x(0), x(2), x(2))) 

    val sqlContext = new SQLContext(sc) 
    val df = data.toDF() 
    df.printSchema() 
    df.show(false) 

    def getRecord(in: String): String = { 
    val ar = in.split("\n").mkString(",").split(",") 
    val data = Array(ar(1), ar(3), ar(5)) 
    data.mkString(",") 
    } 
} 

case class record(Id: String, Term: String, Rank: String) 

輸出:

root 
|-- Id: string (nullable = true) 
|-- Term: string (nullable = true) 
|-- Rank: string (nullable = true) 

+-----+----+----+ 
|Id |Term|Rank| 
+-----+----+----+ 
|19048|1 |1 | 
|19049|5 |5 | 
+-----+----+----+ 
+0

** **縮進你的代碼。 – gsamaras

0

假如你對 「正常」 的文件系統(未HDFS)的文件,你必須寫一個文件分析器和然後用sc.parallelize創建一個RDD然後一個DataFrame

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import scala.collection.mutable 

object Demo extends App { 
    val conf = new SparkConf().setMaster("local[1]").setAppName("Demo") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 


    case class Record(
        var id:Option[Int] = None, 
        var term:Option[String] = None, 
        var rank:Option[Int] = None) 



    val filename = "data.dat" 

    val records = readFile(filename) 
    val df = sc.parallelize(records).toDF 
    df.printSchema() 
    df.show() 



    def readFile(filename:String) : Seq[Record] = { 
    import scala.io.Source 

    val records = mutable.ArrayBuffer.empty[Record] 
    var currentRecord: Record = null 

    for (line <- Source.fromFile(filename).getLines) { 
     val tokens = line.split(',') 

     currentRecord = tokens match { 
     case Array("REC") => Record() 
     case Array("Id", id) => { 
      currentRecord.id = Some(id.toInt); currentRecord 
     } 
     case Array("Term", term) => { 
      currentRecord.term = Some(term); currentRecord 
     } 
     case Array("Rank", rank) => { 
      currentRecord.rank = Some(rank.toInt); records += currentRecord; 
      null 
     } 
     } 
    } 
    return records 
    } 
} 

這給

root 
|-- id: integer (nullable = true) 
|-- term: string (nullable = true) 
|-- rank: integer (nullable = true) 

+-----+----+----+ 
| id|term|rank| 
+-----+----+----+ 
|19048|milk| 1| 
|19049|corn| 5| 
+-----+----+----+