2014-09-10 47 views
-2

我的應用卡與此消息..星火作業陷入與連接管理器問題

14/09/10 18:11:45 INFO ConnectionManager: Accepted connection from [ip-xx-xx-xxx-xx.ec2.internal/10.33.139.85] 14/09/10 18:11:46 INFO SendingConnection: Initiating connection to [ip-1xx-xx-xxx-xx.ec2.internal/10.33.139.85:44309] 14/09/10 18:11:46 INFO SendingConnection: Connected to [ip-xx-xx-xxx-xx.ec2.internal/10.33.139.85:44309], 1 messages pending

我EMR集羣上執行此的一個步驟。

星火版本:1.0.1 [Hadoop的2.2]

懇請一些建議....

+0

你如何執行你的工作嗎?使用spark-submit?你能發佈你使用的命令嗎? – 2014-09-10 21:45:21

+0

即時在EMR羣集上執行一步 – user3279189 2014-09-11 05:28:58

回答

0

我回答我的問題...

這是因爲基礎數據是嚴重傾斜,一個節點獲取所有數據,它無法處理...內存接近100%,如果我等了幾分鐘,它會失敗...

我可以通過介紹來解決這個問題一個額外的基於隨機數的密鑰在加入條件蒸發散...

例子:

import scala.util.Random 

case class dimension(key: Int, id: Int, start_date: Int, end_date: Int,skew_mp: Int) 

val sequenceNumberTableForSkewJoin = sc.parallelize(0 to 199) 

sequenceNumberTableForSkewJoin.map { row => skw(row.toInt) }.registerAsTable("skw") 

sc.textFile("S3Pathtoreadthedimension").cartesian(sequenceNumberTableForSkewJoin).map { case(row,skew) => 
    val parts = row.split("\t") 
    dimension(parts(6).toInt, parts(0).toInt, parts(7).toInt, parts(8).toInt,skew) 
}.registerAsTable("dimension") 

case class fact(dim_key1: Int, 
    dim_key2: String, 
    skew_resolver: Int, 
    measures: Double) 

sc.textFile("S3PathtoReadFactTable").map { row => 
    val parts = row.split("\t") 
     fact(parts(0).toInt, 
     parts(1), 
     Random.nextInt(200), 
     measures 
    }.registerAsTable("fact") 

sqlContext.sql("select * from dimension,fact where dimension.key=fact.dim_key1 and dimension.skew=fact.skew_resolver").saveAsTextFile("outputpath") 

感謝