2017-08-22 125 views
0

我是Spark新手,使用Scala 2.10和Spark 1.6。 試圖Input_file_001.txt格式如下,使用Spark格式化文本文件

Input_file_001.txt:

Dept 0100 Batch Load Errors for 8/16/2016 4:45:56 AM 

Case 1111111111 
Rectype: ABCD 
Key:UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

Case 2222222222 
Rectype: ABCD 
Key:UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 

Case 3333333333 
Rectype: WXYZ 
Key:UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

輸出文件

Input_file_001.txt

case~Rectype~key,Error 
1111111111~ABCD~UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
2222222222~ABCD~UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID,NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 
3333333333~WXYZ~UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

我試圖去實現它像下面,

val source = sc.textFile("Input_file_001.txt") 
val fileread = source.filter(x => ! x.startsWith("Dept"))).filter(_.nonEmpty).map(z => z.trim) 

上面的代碼給我陣列[字符串],不能把它轉發。 任何幫助表示讚賞。

回答

0

您可以利用wholeTextFiles api讀取輸入文件,該文件將讀取輸入文件==>(filename, whole text as one line)。然後你可以操作整個文本行並將其轉換爲你想要的輸出。最後,您可以添加header,並將其保存到一個文件

val rdd = sc.wholeTextFiles("path to Input_file_001.txt") 
val finalRdd = rdd.flatMap(tuple => tuple._2.split("\nCase ") 
    .map(record => record.replace("\nRectype: ", "~").trim 
    .replace("\nKey:", "~").trim 
    .replace("\nUMSV ERROR :", "~UMSV ERROR :").trim 
    .replace("\nNTNB ERROR :", ",NTNB ERROR :").trim) 
).filter(record => !record.startsWith("Dept")) 
val header: RDD[String] = sc.parallelize(Array("case~Rectype~key,Error")) 
header.union(finalRdd).saveAsTextFile("path to ouput file") 

你應該有以下輸出

case~Rectype~key,Error 
1111111111~ABCD ~UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
2222222222~ABCD ~UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID ,NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 
3333333333~WXYZ ~UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

我希望答案是有幫助的

+0

謝謝拉梅什您的輸入。我嘗試了你的解決方案,它給了我與源文件相同的格式,但是在每一行中用〜代替。我試圖實現輸出文件格式,因爲每個字段由〜和行分隔,並以新行結束,因此我可以在其上創建外部配置單元表。也可以在輸出文件中添加源文件名作爲字段? – vin

+0

是不是你的源文件中的問題?如果不是那麼解決方案將無法正常工作, –

+0

哦,我明白了。我使用部分代碼在IntelliJ本地運行它。我剛剛嘗試在羣集上運行.jar。現在正在給予預期的結果。你太棒了。謝謝。 – vin