2016-12-22 50 views
0

我是Hadoop/Spark的新手,並嘗試將多行輸入blob處理爲csv或製表符分隔的格式以供進一步處理。Spark:處理多行輸入blob

例輸入

------------------------------------------------------------------------ 
AAA=someValueAAA1 
BBB=someValueBBB1 
CCC=someValueCCC1 
DDD=someValueDDD1 
EEE=someValueEEE1 
FFF=someValueFFF1 
ENDOFRECORD 
------------------------------------------------------------------------ 
AAA=someValueAAA2 
BBB=someValueBBB2 
CCC=someValueCCC2 
DDD=someValueDDD2 
EEE=someValueEEE2 
FFF=someValueFFF2 
ENDOFRECORD 
------------------------------------------------------------------------ 
AAA=someValueAAA3 
BBB=someValueBBB3 
CCC=someValueCCC3 
DDD=someValueDDD3 
EEE=someValueEEE3 
FFF=someValueFFF3 
GGG=someValueGGG3 
HHH=someValueHHH3 
ENDOFRECORD 
------------------------------------------------------------------------ 

所需輸出

someValueAAA1, someValueBBB1, someValueCCC1, someValueDDD1, someValueEEE1, someValueFFF1 
someValueAAA2, someValueBBB2, someValueCCC2, someValueDDD2, someValueEEE2, someValueFFF2 
someValueAAA3, someValueBBB3, someValueCCC3, someValueDDD3, someValueEEE3, someValueFFF3 

碼香港專業教育學院試圖到目前爲止 -

#inputRDD 
val inputRDD = sc.textFile("/somePath/someFile.gz") 

#transform 
val singleRDD = inputRDD.map(x=>x.split("ENDOFRECORD")).filter(x=>x.trim.startsWith("AAA")) 


val logData = singleRDD.map(x=>{ 
    val rowData = x.split("\n") 

    var AAA = "" 
    var BBB = "" 
    var CCC = "" 
    var DDD = "" 
    var EEE = "" 
    var FFF = "" 

    for (data <- rowData){ 
    if(data.trim().startsWith("AAA")){ 
     AAA = data.split("AAA=")(1) 
    }else if(data.trim().startsWith("BBB")){ 
     BBB = data.split("BBB=")(1) 
    }else if(data.trim().startsWith("CCC=")){ 
     CCC = data.split("CCC=")(1) 
    }else if(data.trim().startsWith("DDD=")){ 
     DDD = data.split("DDD=")(1) 
    }else if(data.trim().startsWith("EEE=")){ 
     EEE = data.split("EEE=")(1) 
    }else if(data.trim().startsWith("FFF=")){ 
     FFF = data.split("FFF=")(1) 
    } 
    } 
    (AAA,BBB,CCC,DDD,EEE,FFF) 
}) 

logData.take(10).foreach(println) 

這似乎並沒有工作,我得到O/P如

AAA,,,,,, 
,BBB,,,,, 
,,CCC,,,, 
,,,DDD,,, 

無法理解這裏最新的錯誤。我是否必須編寫自定義輸入格式來解決這個問題?

回答

1

爲了處理數據按您的要求:

  1. 裝入數據集作爲wholeTextFiles,這使得數據集作爲鍵,值對
  2. 轉換鍵,值對成FlatMap以獲得個別文本集合。例如:使用\n

嘗試下面的代碼

AAA=someValueAAA1 BBB=someValueBBB1 CCC=someValueCCC1 DDD=someValueDDD1 EEE=someValueEEE1 FFF=someValueFFF1 ENDOFRECORD

  • 通過分裂轉換集合到單個元件:

    // load your data set 
    val data = sc.wholeTextFiles("file:///path/to/file") 
    
    val data1 = data.flatMap(x => x._2.split("ENDOFRECORD")) 
    
    val logData = data1.map(x=>{ 
        val rowData = x.split("\n") 
    
        var AAA = "" 
        var BBB = "" 
        var CCC = "" 
        var DDD = "" 
        var EEE = "" 
        var FFF = "" 
    
        for (data <- rowData){ 
        if(data.trim().contains("AAA")){ 
         AAA = data.split("AAA=")(1) 
        }else if(data.trim().contains("BBB")){ 
         BBB = data.split("BBB=")(1) 
        }else if(data.trim().contains("CCC=")){ 
         CCC = data.split("CCC=")(1) 
        }else if(data.trim().contains("DDD=")){ 
         DDD = data.split("DDD=")(1) 
        }else if(data.trim().contains("EEE=")){ 
         EEE = data.split("EEE=")(1) 
        }else if(data.trim().contains("FFF=")){ 
         FFF = data.split("FFF=")(1) 
        } 
        } 
        (AAA,BBB,CCC,DDD,EEE,FFF) 
    }) 
    
    logData.foreach(println) 
    

    OUTPUT:

    (someValueAAA1,someValueBBB1,someValueCCC1,someValueDDD1,someValueEEE1,someValueFFF1) 
    (someValueAAA2,someValueBBB2,someValueCCC2,someValueDDD2,someValueEEE2,someValueFFF2) 
    (someValueAAA3,someValueBBB3,someValueCCC3,someValueDDD3,someValueEEE3,someValueFFF3)