這裏是僞代碼:爲什麼使用廣播增加時間?
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
dataA.fullOuterJoin(dataB).map(doIpCompanyLookUp(ipData, _)).collect()
代碼只是從輸入源獲取IP數據,然後找出它的公司都加入進來了。 這裏是我的問題:
這段代碼將在生產中運行2-3分鐘,但是當刪除廣播數據(只需連接兩個數據)時,它只需花費不到1分鐘。而當我查看用戶界面火花,我發現gc時間可能是問題。
這裏是運行這個作業的設置:
spark-submit --master yarn --deploy-mode client --driver-memory 4g --num-executors 10 --executor-memory 8800m --executor-cores 4 --class ... XX.jar
這項工作是在AWS上運行EMR火花集羣
spark version: 1.6.1
10 m3.xlarge.
- 如何才能解決這個問題(減少運行時間)?
- 廣播數據在火花中消耗哪些內存?
- 當我更改執行程序內存時,爲什麼運行時間不會改變?我嘗試使用5 * m3.2xlarge和--executor-memory 16g,對廣播數據的總運行時間沒有重大變化。
更新:
case class IpDB(startIp: Long, endIp: Long, company: String)
def ipMap(line: Array[String]):
val Array(startIp, endIp, company) = line
IpDB(startIp.toLong, endIp.toLong, company)
// The ip-db is just 300M in raw format.
// format of ip-data from s3:
// 100000 200000 GOOGLE
val dataA = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
val dataB = sc.textFile("s3://...").map((ip, 1)).reduceByKey(_+_)
// will do ip company lookup here
val joinResult = dataA.fullOuterJoin(dataB)
val ipData = sc.broadcast(sc.textFile("s3://....").map(_.split("\t", -1)).map(ipMap).collect().sortWith(_.startIp < _.startIp)).value
joinResult.map(doIpCompanyLookUp(ipData, _)).collect()
只是移動IP公司內部數據的生成和廣播dataA.fullOuterJoin(dataB)
.The運行時間後減少了很多。
update2。由於生產代碼相當複雜,與上面的僞代碼不同,在代碼順序變化很小之後,程序運行得更快,但我不確定問題的關鍵在於初始化廣播數據的位置。
也許這是一個有點天真,但廣播開始發送300M圍繞網絡和堵塞遺囑執行人。你可以嘗試找到DAG嗎? – Vale