2017-05-20 28 views
3

我想創建一個Jupyter/IPython擴展來監視Apache Spark作業。如何從Python中的pySpark添加SparkListener?

Spark提供了一個REST API。

然而,不是輪詢服務器,我希望事件更新通過回調發送。

我正在嘗試使用SparkContext.addSparkListener()註冊SparkListener。該功能在Python中的PySpark SparkContext對象中不可用。那麼我怎樣才能從python註冊python監聽器到Scala/Java版本的上下文。是否有可能通過py4j完成此操作?我想要在偵聽器中觸發事件時調用python函數。

回答

10

雖然它有點涉及,但它是可能的。我們可以使用Py4j callback mechanism傳遞來自SparkListener的消息。首先讓我們創建一個包含所有必需類的Scala包。目錄結構:

. 
├── build.sbt 
└── src 
    └── main 
     └── scala 
      └── net 
       └── zero323 
        └── spark 
         └── examples 
          └── listener 
           ├── Listener.scala 
           ├── Manager.scala 
           └── TaskListener.scala 

build.sbt

name := "listener" 

organization := "net.zero323" 

scalaVersion := "2.11.7" 

val sparkVersion = "2.1.0" 

libraryDependencies ++= List(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "net.sf.py4j" % "py4j" % "0.10.4" // Just for the record 
) 

Listener.scala定義了一個Python接口我們要實現後

package net.zero323.spark.examples.listener 

/* You can add arbitrary methods here, 
* as long as these match corresponding Python interface 
*/ 
trait Listener { 
    /* This will be implemented by a Python class. 
    * You can of course use more specific types, 
    * for example here String => Unit */ 
    def notify(x: Any): Any 
} 

Manager.scala將被用來將郵件轉發到Python的聽衆:

package net.zero323.spark.examples.listener 

object Manager { 
    var listeners: Map[String, Listener] = Map() 

    def register(listener: Listener): String = { 
    this.synchronized { 
     val uuid = java.util.UUID.randomUUID().toString 
     listeners = listeners + (uuid -> listener) 
     uuid 
    } 
    } 

    def unregister(uuid: String) = { 
    this.synchronized { 
     listeners = listeners - uuid 
    } 
    } 

    def notifyAll(message: String): Unit = { 
    for { (_, listener) <- listeners } listener.notify(message) 
    } 

} 

最後一個簡單的SparkListener

package net.zero323.spark.examples.listener 

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 
import org.json4s._ 
import org.json4s.JsonDSL._ 
import org.json4s.jackson.JsonMethods._ 

/* A simple listener which captures SparkListenerTaskEnd, 
* extracts numbers of records written by the task 
* and converts to JSON. You can of course add handlers 
* for other events as well. 
*/ 
class PythonNotifyListener extends SparkListener { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten 
    val message = compact(render(
     ("recordsWritten" -> recordsWritten) 
    )) 
    Manager.notifyAll(message) 
    } 
} 

讓包我們的擴展:

sbt package 

,並開始PySpark會話添加產生jar到類路徑和註冊監聽器:

$SPARK_HOME/bin/pyspark \ 
    --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \ 
    --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener 

接下來我們必須定義一個實現的Python對象接口:

class PythonListener(object): 
    package = "net.zero323.spark.examples.listener" 

    @staticmethod 
    def get_manager(): 
     jvm = SparkContext.getOrCreate()._jvm 
     manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager")) 
     return manager 

    def __init__(self): 
     self.uuid = None 

    def notify(self, obj): 
     """This method is required by Scala Listener interface 
     we defined above. 
     """ 
     print(obj) 

    def register(self): 
     manager = PythonListener.get_manager() 
     self.uuid = manager.register(self) 
     return self.uuid 

    def unregister(self): 
     manager = PythonListener.get_manager() 
     manager.unregister(self.uuid) 
     self.uuid = None 

    class Java: 
     implements = ["net.zero323.spark.examples.listener.Listener"] 

開始回調服務器:

sc._gateway.start_callback_server() 

創建和註冊監聽器:

listener = PythonListener() 

註冊它:

​​

和測試:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test") 
{"recordsWritten":33} 
{"recordsWritten":34} 
{"recordsWritten":33} 

在退出時,你應該關閉回調服務器:

sc._gateway.shutdown_callback_server() 

注意

這應該謹慎星火流,內部使用回調服務器工作時使用。

編輯

如果這是太多的麻煩,你可以只定義org.apache.spark.scheduler.SparkListenerInterface

class SparkListener(object): 
    def onApplicationEnd(self, applicationEnd): 
     pass 
    def onApplicationStart(self, applicationStart): 
     pass 
    def onBlockManagerRemoved(self, blockManagerRemoved): 
     pass 
    def onBlockUpdated(self, blockUpdated): 
     pass 
    def onEnvironmentUpdate(self, environmentUpdate): 
     pass 
    def onExecutorAdded(self, executorAdded): 
     pass 
    def onExecutorMetricsUpdate(self, executorMetricsUpdate): 
     pass 
    def onExecutorRemoved(self, executorRemoved): 
     pass 
    def onJobEnd(self, jobEnd): 
     pass 
    def onJobStart(self, jobStart): 
     pass 
    def onOtherEvent(self, event): 
     pass 
    def onStageCompleted(self, stageCompleted): 
     pass 
    def onStageSubmitted(self, stageSubmitted): 
     pass 
    def onTaskEnd(self, taskEnd): 
     pass 
    def onTaskGettingResult(self, taskGettingResult): 
     pass 
    def onTaskStart(self, taskStart): 
     pass 
    def onUnpersistRDD(self, unpersistRDD): 
     pass 
    class Java: 
     implements = ["org.apache.spark.scheduler.SparkListenerInterface"] 

擴展它:

class TaskEndListener(SparkListener): 
    def onTaskEnd(self, taskEnd): 
     print(taskEnd.toString()) 

,並直接使用它:

>>> sc._gateway.start_callback_server() 
True 
>>> listener = TaskEndListener() 
>>> sc._jsc.sc().addSparkListener(listener) 
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple") 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected],[email protected]) 
SparkListenerTaskEnd(0,0,ResultTask,Success,[email protected]) 

雖然更簡單,但此方法不具有選擇性(JVM和Python之間的流量更多)需要在Python會話內處理Java對象。