2016-11-29 85 views
3

我有一種情況,我想在Spark中的每個工人上執行一個系統進程。我希望這個過程在每臺機器上運行一次。具體來說,這個過程啓動一個守護進程,這個守護進程在執行其餘的程序之前需要運行。理想情況下,這應該在我讀取任何數據之前執行。是否可以對Apache Spark中的所有worker執行命令?

我在Spark 2.0.2上並使用動態分配。

+0

重複的:http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –

回答

5

您可以使用lazy val和Spark廣播的組合來實現此目的。它將如下所示。 (下面的代碼還沒有編譯,您可能需要改變一些東西)

object ProcessManager { 
    lazy val start = // start your process here. 
} 

你可以做你任何轉換之前,在你的應用程序開始播出該對象。

val pm = sc.broadcast(ProcessManager) 

現在,您可以像轉換任何其他廣播變量一樣訪問此對象,並調用lazy val。

rdd.mapPartition(itr => { 
    pm.value.start 
    // Other stuff here. 
} 
+0

這不會爲每個分區執行一次該過程,而且每個工作者不會執行一次? – Jon

+0

你是對的,那只是一個例子。但由於它是一個惰性val,ProcessManager是一個「對象」,它只能在執行器中運行一次。 – Jegan

+0

廣播該對象有點奇怪。你應該廣播數據,而不是代碼。只需擁有對象並訪問啓動變量就足夠了。這樣您就不需要ProcessManager對象可序列化。 – Atreys

2

帶有靜態初始化的object,它調用你的系統進程應該有所斬斷。

object SparkStandIn extends App { 
    object invokeSystemProcess { 
    import sys.process._ 
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".! 

    def doIt(): Unit = { 
     // this object will construct once per jvm, but objects are lazy in 
     // another way to make sure instantiation happens is to check that the errorCode does not represent an error 
    } 
    } 
    invokeSystemProcess.doIt() 
    invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once 
} 
+0

但你如何確保在每次轉換時不重複調用而實際初始化? – 2016-11-29 20:40:33

相關問題