2016-12-17 93 views
0

我是新來的spark.I在轉換RDD的特定領域有疑問。 我有一個文件,如下圖所示:轉變RDD的特定領域

2016-11-10T07:01:37|AAA|S16.12|MN-MN/AAA-329044|288364|2|3 
2016-11-10T07:01:37|BBB|S16.12|MN-MN/AAA-329044/BBB-1|304660|0|0 
2016-11-10T07:01:37|TSB|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1|332164|NA|NA 
2016-11-10T07:01:37|RX|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1/RX-1|357181|0|1 

而且我想輸出中象下面這樣:在第三場我要刪除所有字符,由分隔的整數|。

2016-11-10T07:01:37|AAA|16.12|329044|288364|2|3 
2016-11-10T07:01:37|BBB|16.12|329044|1|304660|0|0 
2016-11-10T07:01:37|TSB|16.12|329044|1|1|332164|NA|NA 
2016-11-10T07:01:37|RX|16.12|329044|1|1|1|357181|0|1 

我該怎麼做。 我試過下面的代碼。

val inputRdd =sc.textFile("file:///home/arun/Desktop/inputcsv.txt"); 
val result =inputRdd.flatMap(line=>line.split("\\|")).collect; 
def ghi(arr:Array[String]):Array[String]= 
{ 
var outlist=scala.collection.mutable.Buffer[String](); 
for(i <-0 to arr.length-1){ 
if(arr(i).matches("(.*)-(.*)")){ 
var io=arr(i); var arru=scala.collection.mutable.Buffer[String](); 
if(io.contains("/")) 
{ 
var ki=io.split("/"); 
for(st <-0 to ki.length-1) 
{ 
var ion =ki(st).split("-"); 
arru+=ion(1); 
} 
var strui=""; 
for(in <-0 to arru.length-1) 
{ 
strui=strui+arru(in)+"|"; 
} 
outlist+=strui; 
} 
else 
{   
var ion =arr(i).split("-"); 
outlist+=ion(1)+"|"; 
} 
} 
else 
{ 
outlist+=arr(i); 
} 
} 
return outlist.toArray; 
} 
var output=ghi(result); 
val finalrdd=sc.parallelize(out, 1); 
finalrdd.collect().foreach(println); 

請幫幫我。

+2

什麼是你的疑問?有什麼不工作?你試過什麼了? – maasg

+0

如果你把第三個字段,例如(MN-MN/AAA-329044/BBB-1),我想把這個字段轉換爲329044 | 1.我想刪除所有字符。 – Khumar

+0

添加您嘗試過的代碼。 – mrsrinivas

回答

0

我們需要做的是從該字段中提取數字,並將它們作爲新條目添加到正在處理的Array中。

像這樣的東西應該做的:

// use data provided as sample 
val dataSample ="""2016-11-10T07:01:37|AAA|S16.12|MN-MN/AAA-329044|288364|2|3 
2016-11-10T07:01:37|BBB|S16.12|MN-MN/AAA-329044/BBB-1|304660|0|0 
2016-11-10T07:01:37|TSB|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1|332164|NA|NA 
2016-11-10T07:01:37|RX|S16.12|MN-MN/AAA-329044/BBB-1/TSB-1/RX-1|357181|0|1""".split('\n') 

val data = sparkContext.parallelize(dataSample) 

val records= data.map(line=> line.split("\\|")) 

// this regex can find and extract the contiguous digits in a mixed string. 
val numberExtractor = "\\d+".r.unanchored 

// we replace field#3 with the results of the regex 
val field3Exploded = records.map{arr => arr.take(3) ++ numberExtractor.findAllIn(arr.drop(3).head) ++ arr.drop(4)} 

// Let's visualize the result 
field3Exploded.collect.foreach(arr=> println(arr.mkString(","))) 

2016-11-10T07:01:37,AAA,S16.12,329044,288364,2,3 
2016-11-10T07:01:37,BBB,S16.12,329044,1,304660,0,0 
2016-11-10T07:01:37,TSB,S16.12,329044,1,1,332164,NA,NA 
2016-11-10T07:01:37,RX,S16.12,329044,1,1,1,357181,0,1 
+0

也可以將其視爲Spark筆記本:https://gist.github.com/maasg/28109dc3ce14b894bde3dab40a42ab4b – maasg

+0

如何將此結果存儲到RDD中,以便我可以將該RDD用於其他轉換。 – Khumar

+0

@khumar你可以對這個中間結果進行進一步的轉換,或者使用其中一個另存爲...功能。查看文檔:http://spark.apache.org/documentation.html – maasg