2016-08-30 48 views
0

這裏是僞代碼:爲什麼使用廣播增加時間?

case class IpDB(startIp: Long, endIp: Long, company: String) 
def ipMap(line: Array[String]): 
    val Array(startIp, endIp, company) = line 
    IpDB(startIp.toLong, endIp.toLong, company) 
// The ip-db is just 300M in raw format. 
// format of ip-data from s3: 
// 100000 200000 GOOGLE 
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 

val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 

// will do ip company lookup here 
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect() 

代碼只是從輸入源獲取IP數據,然後找出它的公司都加入進來了。 這裏是我的問題:

這段代碼將在生產中運行2-3分鐘,但是當刪除廣播數據(只需連接兩個數據)時,它只需花費不到1分鐘。而當我查看用戶界面火花,我發現gc時間可能是問題。

這裏是運行這個作業的設置:

spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar 

這項工作是在AWS上運行EMR火花集羣

spark version: 1.6.1 
10 m3.xlarge. 
  1. 如何才能解決這個問題(減少運行時間)?
  2. 廣播數據在火花中消耗哪些內存?
  3. 當我更改執行程序內存時,爲什麼運行時間不會改變?我嘗試使用5 * m3.2xlarge和--executor-memory 16g,對廣播數據的總運行時間沒有重大變化。

更新:

case class IpDB(startIp: Long, endIp: Long, company: String) 
def ipMap(line: Array[String]): 
    val Array(startIp, endIp, company) = line 
    IpDB(startIp.toLong, endIp.toLong, company) 
// The ip-db is just 300M in raw format. 
// format of ip-data from s3: 
// 100000 200000 GOOGLE 
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_) 

// will do ip company lookup here 
val joinResult = dataA.fullOuterJoin(dataB) 
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 
joinResult.map(doIpCompanyLookUp(ipData, _)).collect() 

只是移動IP公司內部數據的生成和廣播dataA.fullOuterJoin(dataB) .The運行時間後減少了很多。

update2。由於生產代碼相當複雜,與上面的僞代碼不同,在代碼順序變化很小之後,程序運行得更快,但我不確定問題的關鍵在於初始化廣播數據的位置。

+0

也許這是一個有點天真,但廣播開始發送300M圍繞網絡和堵塞遺囑執行人。你可以嘗試找到DAG嗎? – Vale

回答

0

沒有關於你的代碼行太多的思考:

val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value 

我擔心很多。

您使用sc.textFile構建分佈式RDD只能使其本地(給司機)由collect()隨後使其通過sc.broadcast一次可分配給執行人(!)正如你可以看到有很多的來回發送數據。

你會更好cache荷蘭國際集團的IP數據,使其保持在內存中,但這樣做:

sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).cache 
+0

當您用'cache'替換整個ip-company數據的'broadcast'時會發生什麼?這如何影響時間? –

+0

因爲我需要做ip-company查找,所以RDD [IpDB]的數據類型無法提供幫助。這就是爲什麼我需要廣播所有的數據,以確保所有的執行人員都有一份ip-company數據。只要確保我瞭解'broadcast'和'cache'之間的區別。所以我不能在程序中使用'cache'來使它工作。 – lee

相關問題