2017-08-16 22 views
0

我有很多文本文件充滿了不同種類的日誌消息,但每個文件只能顯示一種類型的消息。將具有不同格式的文本文件映射到數據集

File1中:(I); 2017年1月12日; 16:54:45;隨機文本;其他文本
文件2: (I); 2017年1月13日15:34:56;同樣文本;再一次//即空間 日期和時間

我已經得到了這個工作,但我想問這是否是「正確」的方式來做到這一點。另外我的方法只適用於分號和空格之間的變化總是出現在同一位置。
關於這個問題的任何意見,讚賞,因爲我是新來的斯卡拉/火花。

//read file 
val df = spark.read.textFile(file.path).filter(f => f.nonEmpty && f.length > 1 && f.startsWith("(")) 
//create empty dataset of type OutputMessage 
var df3 = Seq.empty[OutputMessage].toDS() 
//get number of semicolons within first line of the dataset to determine type 
val message_type = df.take(1).mkString(",").count(_ == ';') 

if(message_type == 5){ 
    //split by semicolon and create dataset of type InputMessage 
    var df2 = df.map(x => x.split(";")).map(x => InputMessage(x(0), x(1), x(2), x(3), x(4), x(5))) 
    //map to dataset of type output message 
    df3 = df2.map(
     x => 
     OutputMessage(x.status, 
      x.messages_datestring, 
      x.messages_timestring, 
      x.device, 
      x.device_fullmessage, 
      x.device_message, 
      fileName, 
      getWeekday(x.messages_datestring), 
      (x.messages_datestring + "T" + x.messages_timestring), 
      data_company, 
      data_location, 
      data_systemname) 
    ) 
    } 
    else if (message_type == 4){ 
    var df2 = df.map(x => x.split(";")).map(x => InputMessage1(x(0), x(1), x(2), x(3), x(4))) 
    df3 = df2.map(
     x=> 
     OutputMessage(x.status, 
      x.messages_datetimestring.split(" ").take(1).mkString(","), 
      x.messages_datetimestring.split(" ").takeRight(1).mkString(","), 
      x.device, 
      x.device_fullmessage, 
      x.device_message, 
      fileName, 
      getWeekday(x.messages_datetimestring.split(" ").take(1).mkString(",")), 
      x.messages_datetimestring.replace(' ', 'T'), 
      data_company, 
      data_location, 
      data_systemname) 
    ) 
    } 
//convert to rdd 
val dsToRDD = df3_filtered.rdd 
//laod to elasticsearch 
dsToRDD.saveToEs("abdata/log") 

編輯:我剛纔看到有些文件之間有不一致的行。這意味着我的解決方案不再適用於

編輯:將其更改爲基於行的執行。除了行內的隨機分隔符之外,大部分工作都很有效。我得到這個案件的輸出,但不是通緝犯。

object MapRawData{ 
    def mapRawLine (line: String): Option[RawMessage] ={ 
    var msgtype = 0; 
    val fields = line.split(";") 
    if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1 
    if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3 
    if (fields(0).length > 16) msgtype = 2 
    try { 
     fields.map(_.trim) 
     Some(
     RawMessage(
      status = fields(0).take(3), 
      messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10), 
      messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)), 
      device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2), 
      device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3), 
      device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4) 
     ) 
    ) 
    } 
    catch { 
     case e: Exception => 
     println(s"Unable to parse line: $line") 
     None 
    } 
    } 
} 

這種變化方式比第一種方式更耗費時間/資源嗎?

回答

0

將其更改爲基於行的執行。除了行內的隨機分隔符之外,大部分工作都很有效。我得到這個案件的輸出,但不是通緝犯。

object MapRawData{ 
    def mapRawLine (line: String): Option[RawMessage] ={ 
    var msgtype = 0; 
    val fields = line.split(";") 
    if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1 
    if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3 
    if (fields(0).length > 16) msgtype = 2 
    try { 
     fields.map(_.trim) 
     Some(
     RawMessage(
      status = fields(0).take(3), 
      messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10), 
      messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)), 
      device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2), 
      device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3), 
      device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4) 
     ) 
    ) 
    } 
    catch { 
     case e: Exception => 
     println(s"Unable to parse line: $line") 
     None 
    } 
    } 
} 
相關問題