2016-03-02 67 views
0

我有一個spark.rdd.RDD[String] MapPartition,我用過濾器創建。Spark - Rdd字符串清理/操作

val myMapPartition = myTextFile.filter(_.split("\t")(2) == "\"red\"") 

該過濾器由製表符分割我的文本文件線和檢查所得到的陣列的第二元件是否等於「紅色」

myMapPartition.collect()返回String類型的Array。這裏是一個例子:

24344 "someString" "red" 
23421 "someOtherString" "red" 

我想對字符串執行一些編輯。最終,我在查看一些字符串替換邏輯,但我試圖先串聯一個字符串。所以我要尋找的是這樣的:

24344 "someString hello" "red" 
23421 "someOtherString hello" "red" 

我試圖做到這一點使用map

val myCleanRdd = myMapPartition.map(_1 => (_1.concat(" hello"))) 

不過,我結束了:

24344 "someString" "red" hello 
23421 "someOtherString" "red" hello 

我的問題是我如何操縱rdd行的某些元素?我認爲問題在於我的排被認爲是一個String。我不知道如何正確映射這個,讓我專注於個別領域。

免責聲明:斯卡拉/星火小白

回答

2

首先,您需要在您的原始RDD的每個元素映射split,所以你最終與RDD[Array[String]]而不是RDD[String],例如

myTextFile.map(_.split("\t")).filter(_(2) == "\"red\"") 

你目前正在使用split過濾字符串你的輸入RDD,但是這僅僅是創建輸出字符串的RDD,扔掉你對split他們的工作。

然後,如果您的RDD的每個元素是一個已知長度的Array[String],然後就可以map使用圖案匹配(使用case關鍵字)來提取和修改單個元件,例如:

rdd.map { case Array(x, y, z) => Array(x, y + " hello", z) } 

(請注意,使用此方法時,必須使用大括號{}而不是括號map函數的括號())。類似的模式匹配可以對列表,元組,元素等行進行匹配...

更新:如果您想用處理過的版本替換其中一個元素,這是類似的模式。

rdd.map { case Array(x, y, z) => Array(x, y.replace("s","x"), z) } 

要打印出RDD[Array[String]]的所有元素,你可以做一個嵌套foreach,例如

rdd.foreach(_.foreach(println)) 

打印出的每一行作爲一個數組是棘手不是因爲重載方法預期(一個通常會使用Arrays.toString但這seems to cause type problems在斯卡拉),但可以做如下:

rdd.foreach(row => println(row.mkString("[",",","]"))) 
+0

正是我正在尋找!有兩件事情:你可以討論如何將這個模式應用於像stringReplace功能的東西嗎?另外,一旦我得到'Array [String]'的原始紅色,我該如何打印?現在我只看到內存地址?謝謝! –

+0

請參閱我的更新回答。 – DNA