我使用Scala和星火管理的記錄的大數量及每個這些記錄在管理一個大aggregateByKey有以下形式:星火:如何在一臺機器
single record => (String, Row)
每由組成45種不同類型的值(String
,Integer
,Long
)。
要聚合他們我使用:
myRecords.aggregateByKey (List [Any]()) (
(aggr, value) => aggr ::: (value :: Nil),
(aggr1, aggr2) => aggr1 ::: aggr2
)
的問題是,我得到constanly消息:
15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms
15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms
[Stage 3:====> (875 + 24)/3252]
15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed
...and finally...
15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465)
java.lang.OutOfMemoryError: GC overhead limit exceeded
我可以猜測的是,聚集這麼大,匹配新記錄的關鍵是需要越來越多的時間,直到某個任務由於找不到添加記錄值的正確位置而超時。
我打了不同的參數,從,如:
spark.default.parallelism => to reduce the size of tasks augmenting this value
spark.executor.memory => usually I put much less then driver memory
spark.driver.memory => the whole driver memory (single machine tho)
--master local[number of cores]
任何想法如何在過程結束時得到不亂內存/超時?
UPDATE
我想基於合併兩個CSV文件:
1)基於基於3列CSV列 2)合併加入行,加入他們的行列值 3 )集料/基團此接合&合併的文件與在2做在所述單個集合數據從3一些東西鍵) 4))
這是代碼:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object MyRecords {
def createKey(k1: String, k2: String, k3: String):String = {
Seq(k1, k2, k3).iterator.map (r => if (r == null) "" else r.trim.toUpperCase).mkString ("")
}
def main(args: Array[String]): Unit = {
val df1FilePath = args (0)
val df2FilePath = args (1)
val sc = new SparkContext (new SparkConf ())
val sqlContext = new SQLContext (sc)
import sqlContext.implicits._
val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one")
df1.registerTempTable("df1")
val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath)
val df2Renamed = df2.select(
col ("v0").as ("y_v0"),
col ("v1").as ("y_v1"),
col ("v2").as ("y_v2"),
col ("v3").as ("y_v3"),
col ("v4").as ("y_v4"),
col ("v5").as ("y_v5"),
col ("v6").as ("y_v6"),
col ("v7").as ("y_v7"),
col ("v8").as ("y_v8"),
col ("v9").as ("y_v9"),
col ("v10").as ("y_v10"),
col ("v11").as ("y_v11"),
col ("v12").as ("y_v12"),
col ("v13").as ("y_v13"),
col ("v14").as ("y_v14"),
col ("v15").as ("y_15"),
col ("v16").as ("y_16"),
col ("v17").as ("y_17"),
col ("v18").as ("y_18"),
col ("v19").as ("y_19"),
col ("v20").as ("y_20"),
col ("v21").as ("y_21"),
col ("v22").as ("y_22"),
col ("v23").as ("y_23"),
col ("v24").as ("y_24"),
col ("v25").as ("y_25"),
col ("v26").as ("y_26"),
col ("v27").as ("y_27"),
col ("v28").as ("y_28"),
col ("v29").as ("y_29"),
col ("v30").as ("y_30"),
col ("v31").as ("y_31"),
col ("v32").as ("y_32")
).as("two")
df2Renamed.registerTempTable("df2")
val dfJoined = dfArchive.join(df2Renamed, $"one.v0" === $"two.y_v0", "fullouter").as("j")
dfJoined.registerTempTable("joined")
val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r =>
if (r.getAs("y_v1") != null) {
(createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4")), r)
} else {
(createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4")), r)
})
dfMerged.groupByKey().collect().foreach(println)
sc.stop()
}
}
你可以多顯示一下''Row''嗎?還有''myRecords''?我認爲你的聚合可以變得更有效率 –