我回答我的問題...
這是因爲基礎數據是嚴重傾斜,一個節點獲取所有數據,它無法處理...內存接近100%,如果我等了幾分鐘,它會失敗...
我可以通過介紹來解決這個問題一個額外的基於隨機數的密鑰在加入條件蒸發散...
例子:
import scala.util.Random
case class dimension(key: Int, id: Int, start_date: Int, end_date: Int,skew_mp: Int)
val sequenceNumberTableForSkewJoin = sc.parallelize(0 to 199)
sequenceNumberTableForSkewJoin.map { row => skw(row.toInt) }.registerAsTable("skw")
sc.textFile("S3Pathtoreadthedimension").cartesian(sequenceNumberTableForSkewJoin).map { case(row,skew) =>
val parts = row.split("\t")
dimension(parts(6).toInt, parts(0).toInt, parts(7).toInt, parts(8).toInt,skew)
}.registerAsTable("dimension")
case class fact(dim_key1: Int,
dim_key2: String,
skew_resolver: Int,
measures: Double)
sc.textFile("S3PathtoReadFactTable").map { row =>
val parts = row.split("\t")
fact(parts(0).toInt,
parts(1),
Random.nextInt(200),
measures
}.registerAsTable("fact")
sqlContext.sql("select * from dimension,fact where dimension.key=fact.dim_key1 and dimension.skew=fact.skew_resolver").saveAsTextFile("outputpath")
感謝
你如何執行你的工作嗎?使用spark-submit?你能發佈你使用的命令嗎? – 2014-09-10 21:45:21
即時在EMR羣集上執行一步 – user3279189 2014-09-11 05:28:58