2016-07-29 77 views
0

我對Spark Apache非常陌生,這主要是對我自己的練習。我有兩個json文件。在數據框中搜索條件

File1 companies.json) 
[ 
{"symbol":...,"name":...,"description":...} 
. 
. 
] 

File 2) emails.json: 
[ 
{"from":...,"to":...,"subject":...,"body":...} 
] 

現在我已經通過閱讀這兩個文件合併成一個數據幀:

val companies = spark.read.json("hdfs://symbols.json") 
    val emails = spark.read.json("hdfs://emails-out.json") 

我想要做的就是把所有的電子郵件行,並行他們並篩選出只包含搜索字詞的郵件來自companies.json中的(符號,名稱)。我在電子郵件中匹配(符號,名稱)與from,to,subject和body字段。

這個問題的最佳方法是什麼?我應該只是將電子郵件轉換爲RDD並行化行,然後檢索每個單獨的搜索詞並匹配電子郵件?一旦電子郵件包含companies.json中的任何條款,我就會返回該列表。

我一直都在這一整天,因爲我對這種發展很新。

感謝

回答

1

使用廣播變量建立在Narendra之上。

val email = sqlContext.jsonFile("/Users/raviramadoss/emails.json") 
val companies = sqlContext.jsonFile("/Users/raviramadoss/companies.json") 

val companyMap = companies.flatMap(x => List(x.getString(1),x.getString(2))).collect() 
val bcCompany = sc.broadcast(companyMap) 
val bcval = bcCompany.value 

val func: (String => Boolean) = (arg: String) => bcCompany.value.foldLeft(false)(_ || arg.contains(_)) 

val sqlfunc = udf(func) 

email.show(false) 
companies.show(false) 
email.filter(sqlfunc(col("from"))).show() 

輸出:

+----------------------------------------------------+-----------------------+-------------+ 
|body            |from     |to   | 
+----------------------------------------------------+-----------------------+-------------+ 
|First email from a company in the known company list|[email protected]   |[email protected]| 
|This email should be filtered out     |[email protected]|[email protected]| 
+----------------------------------------------------+-----------------------+-------------+ 
+-----------+----------+------+ 
|description|name  |symbol| 
+-----------+----------+------+ 
|Citigroup |Citi  |citi | 
|Capital One|capitalone|capone| 
+-----------+----------+------+ 
+--------------------+--------------+-------------+ 
|    body|   from|   to| 
+--------------------+--------------+-------------+ 
|First email from ...|[email protected]|[email protected]| 
+--------------------+--------------+-------------+ 
2

,如果你的數據量小,那麼你可以按照任何的辦法,如果企業的數據集有較少的數據,然後你把數組或地圖上的驅動程序,然後讓電子郵件

email.filter的RDD (eachline =>(companies.foldLeft(false)(_ || eachline(7).contains(_))))