我有一種情況,我想在Spark中的每個工人上執行一個系統進程。我希望這個過程在每臺機器上運行一次。具體來說,這個過程啓動一個守護進程,這個守護進程在執行其餘的程序之前需要運行。理想情況下,這應該在我讀取任何數據之前執行。是否可以對Apache Spark中的所有worker執行命令?
我在Spark 2.0.2上並使用動態分配。
我有一種情況,我想在Spark中的每個工人上執行一個系統進程。我希望這個過程在每臺機器上運行一次。具體來說,這個過程啓動一個守護進程,這個守護進程在執行其餘的程序之前需要運行。理想情況下,這應該在我讀取任何數據之前執行。是否可以對Apache Spark中的所有worker執行命令?
我在Spark 2.0.2上並使用動態分配。
您可以使用lazy val和Spark廣播的組合來實現此目的。它將如下所示。 (下面的代碼還沒有編譯,您可能需要改變一些東西)
object ProcessManager {
lazy val start = // start your process here.
}
你可以做你任何轉換之前,在你的應用程序開始播出該對象。
val pm = sc.broadcast(ProcessManager)
現在,您可以像轉換任何其他廣播變量一樣訪問此對象,並調用lazy val。
rdd.mapPartition(itr => {
pm.value.start
// Other stuff here.
}
帶有靜態初始化的object
,它調用你的系統進程應該有所斬斷。
object SparkStandIn extends App {
object invokeSystemProcess {
import sys.process._
val errorCode = "echo Whatever you put in this object should be executed once per jvm".!
def doIt(): Unit = {
// this object will construct once per jvm, but objects are lazy in
// another way to make sure instantiation happens is to check that the errorCode does not represent an error
}
}
invokeSystemProcess.doIt()
invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
但你如何確保在每次轉換時不重複調用而實際初始化? – 2016-11-29 20:40:33
重複的:http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –