2015-11-21 35 views
1

我使用Scala和星火管理的記錄的大數量及每個這些記錄在管理一個大aggregateByKey有以下形式:星火:如何在一臺機器

single record => (String, Row) 

每由組成45種不同類型的值(String,Integer,Long)。

要聚合他們我使用:

myRecords.aggregateByKey (List [Any]()) (
     (aggr, value) => aggr ::: (value :: Nil), 
     (aggr1, aggr2) => aggr1 ::: aggr2 
) 

的問題是,我得到constanly消息:

15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms 

15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms 

[Stage 3:====>    (875 + 24)/3252] 

15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed 

...and finally... 

15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465) 
java.lang.OutOfMemoryError: GC overhead limit exceeded 

我可以猜測的是,聚集這麼大,匹配新記錄的關鍵是需要越來越多的時間,直到某個任務由於找不到添加記錄值的正確位置而超時。

我打了不同的參數,從​​,如:

spark.default.parallelism => to reduce the size of tasks augmenting this value 

spark.executor.memory => usually I put much less then driver memory 

spark.driver.memory => the whole driver memory (single machine tho) 

--master local[number of cores] 

任何想法如何在過程結束時得到不亂內存/超時?

UPDATE

我想基於合併兩個CSV文件:

1)基於基於3列CSV列 2)合併加入行,加入他們的行列值 3 )集料/基團此接合&合併的文件與在2做在所述單個集合數據從3一些東西鍵) 4))

這是代碼:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.storage.StorageLevel._ 
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext} 
import org.apache.spark.{SparkConf, SparkContext} 

object MyRecords { 

    def createKey(k1: String, k2: String, k3: String):String = { 
    Seq(k1, k2, k3).iterator.map (r => if (r == null) "" else r.trim.toUpperCase).mkString ("") 
    } 

    def main(args: Array[String]): Unit = { 

    val df1FilePath = args (0) 
    val df2FilePath = args (1) 

    val sc = new SparkContext (new SparkConf ()) 
    val sqlContext = new SQLContext (sc) 
    import sqlContext.implicits._ 

    val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one") 

    df1.registerTempTable("df1") 

    val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath) 

    val df2Renamed = df2.select(
     col ("v0").as ("y_v0"), 
     col ("v1").as ("y_v1"), 
     col ("v2").as ("y_v2"), 
     col ("v3").as ("y_v3"), 
     col ("v4").as ("y_v4"), 
     col ("v5").as ("y_v5"), 
     col ("v6").as ("y_v6"), 
     col ("v7").as ("y_v7"), 
     col ("v8").as ("y_v8"), 
     col ("v9").as ("y_v9"), 
     col ("v10").as ("y_v10"), 
     col ("v11").as ("y_v11"), 
     col ("v12").as ("y_v12"), 
     col ("v13").as ("y_v13"), 
     col ("v14").as ("y_v14"), 
     col ("v15").as ("y_15"), 
     col ("v16").as ("y_16"), 
     col ("v17").as ("y_17"), 
     col ("v18").as ("y_18"), 
     col ("v19").as ("y_19"), 
     col ("v20").as ("y_20"), 
     col ("v21").as ("y_21"), 
     col ("v22").as ("y_22"), 
     col ("v23").as ("y_23"), 
     col ("v24").as ("y_24"), 
     col ("v25").as ("y_25"), 
     col ("v26").as ("y_26"), 
     col ("v27").as ("y_27"), 
     col ("v28").as ("y_28"), 
     col ("v29").as ("y_29"), 
     col ("v30").as ("y_30"), 
     col ("v31").as ("y_31"), 
     col ("v32").as ("y_32") 
    ).as("two") 

    df2Renamed.registerTempTable("df2") 

    val dfJoined = dfArchive.join(df2Renamed, $"one.v0" === $"two.y_v0", "fullouter").as("j") 

    dfJoined.registerTempTable("joined") 

    val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r => 
     if (r.getAs("y_v1") != null) { 
     (createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4")), r) 
     } else { 
     (createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4")), r) 
     }) 

    dfMerged.groupByKey().collect().foreach(println) 

    sc.stop() 
    } 
} 
+0

你可以多顯示一下''Row''嗎?還有''myRecords''?我認爲你的聚合可以變得更有效率 –

回答

2

因爲所有你做的,是通過關鍵組最好是使用groupByKey代替aggregateByKey,尤其是一,其創建的臨時對象的像value :: Nil數量龐大(爲什麼不直接value :: aggr?)。

由於它不執行地圖端聚合,它應該減少垃圾收集器的壓力(請參閱SPARK-772)。

參見:Is groupByKey ever preferred over reduceByKey

編輯

關於您在更新它並沒有真正意義提供的代碼。如果您想使用DataFrames,則沒有理由首先使用RDDs對數據進行分組。通過保留Strings和轉碼值可增加內存使用量並強調GC,也可以複製數據。它看起來像你所需要的大致是這樣的(用spark-csv一個小的幫助):

// Load data, optionally add .option("inferSchema", "true") 
val df1 = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("delimiter", "\t") 
    .load(file1Path) 

val df2 = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("delimiter", "\t") 
    .load(file2Path) 

// Join and cache 
val df = df1.join(
    df2, 
    // Join condition 
    df1("foo") === df2("foo") && 
    df1("bar") === df2("bar") && 
    df1("baz") === df2("baz"), 
    "fullouter") 
df.registerTempTable("df") 
sqlContext.cacheTable("df") 

// Perform all the required casting using safe cast methods 
// and replace existing columns 
df.withColumn("some_column", $"some_column".cast(IntegerType)) 

您可以根據需要執行您可以將數據幀without physically grouping the data上執行任何聚合。如果你想子集簡單地使用wherefilter

+3

我已經期待在這裏看到你;)你的答案是高質量的。 – javadba

+0

同樣的問題:'java.lang.OutOfMemoryError:Java堆空間java.util.Arrays.copyOf(Arrays.java:2271)' – Randomize

+0

'java.lang.OutOfMemoryError:超出GC開銷限制'和'java.lang.OutOfMemoryError :Java中的堆空間不是同一個問題。實際上,爲什麼你首先將這些數據轉換爲行和組?或爲此收集。 – zero323