2016-02-27 55 views
0

我想用不同的Fair Scheduler pools執行不同的Spark Streaming作業。我正在玩Spark GitHub存儲庫中的NetworkWordCount示例。我將其修改爲運行3個池,其權重分別爲1,5,10。如何使用Spark Streaming配置Fair Scheduler池?

我知道使用Spark Streaming需要使用workaround將作業分配給特定池,因此我以這種方式修改了代碼:

wordCounts.foreachRDD(rdd => 
     rdd.sparkContext.setLocalProperty("spark.scheduler.pool", poolName) 
    ) 
    wordCounts.foreachRDD(rdd => 
     println(rdd.sparkContext.getLocalProperty("spark.scheduler.pool")) 
    ) 
    wordCounts.print() 

從第二foreachRDD打印我可以看到,所有的池工作,但他們不尊重分配給它們的優先級。每個池都有自己的數據源,並且它們全部以相同的字數/秒運行。由於工作量相同,我認爲問題在其他地方,但我無法看到它在哪裏。

回答

0

認爲你需要mapprint之前分配池(使用本地屬性),即

wordCounts.map(RDD => rdd.sparkContext.setLocalProperty(「spark.scheduler.pool 「,poolName))。print()

您也可以使用ssc.sparkContext來設置池。

相關問題