2017-06-17 38 views
1

我如何看到SparkContext是否有內容執行,什麼時候一切都完成了我停止它?因爲目前我等待30秒才能調用SparkContext.stop,否則我的應用程序會拋出錯誤。如何等待SparkContext完成所有過程?

import org.apache.log4j.Level 
import org.apache.log4j.Logger 
import org.apache.spark.SparkContext 

object RatingsCounter extends App { 

    // set the log level to print only errors 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    // create a SparkContext using every core of the local machine, named RatingsCounter 
    val sc = new SparkContext("local[*]", "RatingsCounter") 

    // load up each line of the ratings data into an RDD (Resilient Distributed Dataset) 
    val lines = sc.textFile("src/main/resource/u.data", 0) 

    // convert each line to s string, split it out by tabs and extract the third field. 
    // The file format is userID, movieID, rating, timestamp 
    val ratings = lines.map(x => x.toString().split("\t")(2)) 

    // count up how many times each value occurs 
    val results = ratings.countByValue() 

    // sort the resulting map of (rating, count) tuples 
    val sortedResults = results.toSeq.sortBy(_._1) 

    // print each result on its own line. 
    sortedResults.foreach { case (key, value) => println("movie ID: " + key + " - rating times: " + value) } 

    Thread.sleep(30000) 

    sc.stop() 
} 
+0

斯卡拉= 2.11.8和火花= 1.6.1 –

+0

你可以分享你把你的主函數中的對象? – eliasah

+1

你可以嘗試,而不是擴展應用程序def主和第二個參數1 sc.textfile –

回答

5

火花應用程序應該限定,而不是延伸scala.App一個main()方法。 scala.App的子類可能無法正常工作。

而且,由於您正在擴展應用程序,所以您會收到意想不到的行爲。

您可以在有關Self Contained Applications的官方文檔中閱讀有關它的更多信息。

App使用DelayedInit並可能導致初始化問題。用主要方法你知道發生了什麼。 Excerpt from reddit.

object HelloWorld extends App { 
    var a = 1 
    a + 1 
    override def main(args: Array[String]) { 
    println(a) // guess what's the value of a ? 
    } 
} 
+0

我知道如何定義Scala上的主要方法。我的問題,如何寫在這個問題上,是如何從SparkContext定義一個回調方法。 –

+0

你說如果你不加入睡眠,你遇到了一個錯誤。我回答了你爲什麼對你的實際代碼有問題。現在回撥你需要使用類似spark-jobserver的東西,因爲spark不能像任何其他重型Scala應用那樣給予回撥。 – eliasah

+0

我相信這是您的錯誤「因爲目前我等待30秒才能調用SparkContext.stop,否則我的應用程序會引發錯誤」。這不是因爲回電... – eliasah