2017-09-15 82 views
2

由於數據是這樣的:如何根據數據類型過濾數據?

val my_data = sc.parallelize(Array(
    "Key1, foobar, 10, twenty, 20", 
    "Key2, impt, 11, sixty, 6", 
    "Key3, helloworld, 110, seventy, 9")) 

我想過濾並創建一個key,value RDD象下面這樣:

key1, foobar 
key1, twenty 
key2, impt 
key2, sixty 
key3, helloworld 
key3, seventy 

我已經試過

我想,我可以只需將數據放在一個表中並讓數據類型被推斷即可。

//is there a way to avoid writing to file??? 
my_data.coalesce(1).saveAsTextFile("/tmp/mydata.csv") 
val df_mydata = sqlContext.read 
.format("com.databricks.spark.csv") 
.option("inferSchema", "true") 
.load("/tmp/mydata.csv") 

上面的工作,使我有一個正確的數據類型的表。但是,我不知道如何過濾數據類型,然後從中創建鍵/值對。

我還可以使用Character.isDigit,而不是創建一個模式,但還需要知道如何篩選鍵/值對解決這將是

回答

0

的一種方式,正如你所說,用一個split一起使用Character.isDigitflatMap。使用您my_data爲例:

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val df = my_data.map(_.split(",").map(_.trim).toList.filterNot(s => s.forall(_.isDigit))) 
    .flatMap{case ((key: String)::tail) => tail.map(t => (key, t))}.toDF("Key", "Value") 
df.show() 

,這將給你這樣的事情:

+----+----------+ 
| Key|  Value| 
+----+----------+ 
|Key1| foobar| 
|Key1| twenty| 
|Key2|  impt| 
|Key2|  sixty| 
|Key3|helloworld| 
|Key3| seventy| 
+----+----------+ 

在這裏,我也把它轉換成數據幀,但是如果你想要一個RDD就可以直接跳過這一步。爲了使它工作,每行必須包含一個鍵,並且該鍵應該位於字符串中的第一個位置。

希望它有幫助!


編輯:

中使用的命令的擊穿。

的第一張地圖經過每串在你的RDD,每串應用(按順序)如下:

.split(",") 
.map(_.trim) 
.toList 
.filterNot(s => s.forall(_.isDigit)) 

讓我們用你的第一排爲例:"Key1, foobar, 10, twenty, 20"。首先,行被「,」分割,這將給你一串字符串Array("Key1", " foobar", " 10", " twenty", " 20")

接下來是map(_.trim)它將修剪(刪除單詞之前和之後的空格)數組中的每個元素,該數組也將轉換爲列表(以後在flatMap中匹配的情況):List("Key1", "foobar", "10", "twenty", "20")

filterNot將刪除所有字符都是數字的所有字符串。這裏的forall會檢查每個角色是否滿足這個條件。這將刪除列表中的一些元素:List("Key1", "foobar", "twenty")

現在,關鍵的遺體後,只進行了分組過濾:

flatMap{case ((key: String)::tail) => tail.map(t => (key, t))} 

這裏key成爲第一個元素的每一行的名單,以下從它變成「KEY1」前行的例子。 tail只是列表中的其餘部分。然後,對於不是key值的每個元素,我們用元組(key, value)替換它。換句話說,每個元素(除了第一個元素,即key)都會變成包含key及其本身的元組。這裏使用的是flatMap,否則你會得到一個元組列表,而不是所需的元組列表。

最後一個將其轉換爲使用toDF("Key", "Value")的命名列的數據框,請注意,這需要在開始時使用導入(import spark.implicits._)。

+0

我是新來的斯卡拉和一般的火花。如果可能的話,你能否分解這個鏈式命令來解釋這些步驟的作用?我已確認您的解決方案正常運行,並感謝您的支持! –

+0

@ spark-health-learn當然,我添加了對命令的解釋以及它們如何一起工作來接收結果。希望它能幫助你學習:)如果它對你有幫助,請點擊複選標記/ upvote接受答案。 – Shaido

+0

這真的很有幫助。 'tail.map(t =>(key,t)''的最後一個命令讓我暫時不瞭解。 –