2016-08-04 42 views
0

我有一個固定長度的文件(示例如下所示),我想使用SCALA(不是python或java)在Spark中使用DataFrames API讀取此文件。 。 使用DataFrame API可以讀取textFile,json文件等,但不知道是否有讀取固定長度文件的方法。我在網上尋找這個,發現github link,但我爲此目的下載spark-fixedwidth-assembly-1.0.jar,但我無法找到任何地方的jar。我完全迷失在這裏,需要你的建議和幫助。在Stackoverflow中有幾篇文章,但它們與Scala和DataFrame API無關。如何在Spark中使用DataFrame API和SCALA讀取固定長度的文件

這裏是文件

56 apple  TRUE 0.56 
45 pear  FALSE1.34 
34 raspberry TRUE 2.43 
34 plum  TRUE 1.31 
53 cherry TRUE 1.4 
23 orange FALSE2.34 
56 persimmon FALSE23.2 

每列的固定寬度是3,10,5,4

請提出你的意見。

回答

2

那麼......使用子字符串來斷行。然後修剪以刪除wheitespaces。然後做你想做的。

case class DataUnit(s1: Int, s2: String, s3:Boolean, s4:Double) 

sc.textFile('your_file_path') 
    .map(l => (l.substring(0, 3).trim(), l.substring(3, 13).trim(), l.substring(13,18).trim(), l.substring(18,22).trim())) 
    .map({ case (e1, e2, e3, e4) => DataUnit(e1.toInt, e2, e3.toBoolean, e4.toDouble) }) 
    .toDF 
+0

我試圖在REPL但我得到的錯誤。你可以提一下在REPL中鍛鍊嗎? –

+0

':32:錯誤:錯誤的參數數量;預期= 1 val mapRDD = file.map(l =>(l.substring(0,4).trim(),l.substring(4,14).trim(),l.substring(14,19)。 trim(),l.substring(19,23).trim()))。map((e1,e2,e3,e4)=> DataUnit(e1.toInt,e2,e3.toBoolean,e4.toDouble))。 toDF ^ ' –

+0

現在應該修復。嘗試在REPL中逐步運行每個映射。 –

1

固定長度格式很舊,我找不到這個格式的好的Scala庫...所以我創建了我自己的。

您可以點擊此處查看:https://github.com/atais/Fixed-Length

星火用法很簡單,你會得到你的對象DataSet

首先,您需要創建對象的說明,FE:

case class Employee(name: String, number: Option[Int], manager: Boolean) 

object Employee { 

    import com.github.atais.util.Read._ 
    import cats.implicits._ 
    import com.github.atais.util.Write._ 
    import Codec._ 

    implicit val employeeCodec: Codec[Employee] = { 
     fixed[String](0, 10) <<: 
     fixed[Option[Int]](10, 13, Alignment.Right) <<: 
     fixed[Boolean](13, 18) 
    }.as[Employee] 
} 

後來只使用解析器:

val input = sql.sparkContext.textFile(file) 
       .filter(_.trim.nonEmpty) 
       .map(Parser.decode[Employee]) 
       .flatMap { 
        case Right(x) => Some(x) 
        case Left(e) => 
         System.err.println(s"Failed to process file $file, error: $e") 
         None 
       } 
sql.createDataset(input) 
相關問題