2016-01-25 98 views
12

使用scala運行spark工作,正如所料,所有工作都按時完成,但某些INFO日誌在工作停止前打印20-25分鐘。Spark工作完成,但應用程序需要時間關閉

發佈幾個UI截圖,可以幫助解決問題。

  1. 以下是採取4個階段時間:

Time taken by 4 stages

  • 以下是連續的作業ID time between consecutive job ids
  • 我之間的時間不明白爲什麼在這兩個工作ID之間花費了很多時間。

    以下是我的代碼片段:

    val sc = new SparkContext(conf) 
    for (x <- 0 to 10) { 
        val zz = getFilesList(lin); 
        val links = zz._1 
        val path = zz._2 
        lin = zz._3 
        val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33)))) 
        val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1))); 
        val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2) 
        t.saveAsTextFile(path) 
    } 
    sc.stop() 
    

    一些更隨訪:spark-1.4.1 saveAsTextFile to S3 is very slow on emr-4.0.0

    +0

    我想一般一個更新的代碼建議使用Databricks中的spark-csv包而不是saveAsTextFile,但除此之外,您運行的是哪個Spark版本? –

    +0

    saveAsTextFile的優點是我可以直接在s3上保存所有內容,不知道spark-csv包數據框如何工作。感謝一些方向,無論如何將調查它。 spark - 1.4.1 scala - 2.10.6 – Harshit

    回答

    2

    我最終升級了我的spark版本,問題已解決。

    17

    正如我加入了註釋,我建議使用火花CSV包,而不是sc.saveAsTextFile並且沒有問題使用該包直接寫入s3 :)

    我不知道您是否使用s3或s3n,但也許嘗試切換。我在使用Spark 1.5.2(EMR-4.2)上的s3a時遇到了問題,其中寫入超時並切換回s3解決了問題,所以值得一試。

    一對夫婦的應該加快其他的東西寫入S3 IS使用DirectOutputCommiter

    conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter") 
    

    和禁用代_SUCCESS文件:

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 
    

    請注意,禁用_SUCCESS文件必須是設置爲SparkContext的hadoop配置,而不是SparkConf

    我希望這會有所幫助。

    1

    將文件寫入S3時遇到同樣的問題。我用的是星火2.0版本,只是給你的驗證答案

    在星火2.0可以使用,

    val spark = SparkSession.builder().master("local[*]").appName("App_name").getOrCreate() 
    
    spark.conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter") 
    spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 
    

    這解決了我的工作的問題得到擊中

    相關問題