2017-04-12 134 views
2

我已經使用sbt assembly構建了我的作業jar,將所有依賴關係放在一個jar中。當我嘗試提交我的二進制火花jobserver我越來越akka.pattern.AskTimeoutException將提交fat-jar到spark-jobserver時得到超時(akka.pattern.AskTimeoutException)

我修改我的配置才能夠提交大罐子(我加parsing.max-content-length = 300m到我的配置)我也增加了一些超時的配置,但沒有任何幫助。

後,我跑:

curl --data-binary @matching-ml-assembly-1.0.jar localhost:8090/jars/matching-ml 

我越來越:

{ 
    "status": "ERROR", 
    "result": { 
    "message": "Ask timed out on [Actor[akka://JobServer/user/binary-manager#1785133213]] after [3000 ms]. Sender[null] sent message of type \"spark.jobserver.StoreBinary\".", 
    "errorClass": "akka.pattern.AskTimeoutException", 
    "stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)", "akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)", "scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)", "scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:331)", "akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:282)", "akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:286)", "akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238)", "java.lang.Thread.run(Thread.java:745)"] 
    } 

我的配置:

# Template for a Spark Job Server configuration file 
# When deployed these settings are loaded when job server starts 
# 
# Spark Cluster/Job Server configuration 
spark { 
    # spark.master will be passed to each job's JobContext 
    master = "local[4]" 
    # master = "mesos://vm28-hulk-pub:5050" 
    # master = "yarn-client" 

    # Default # of CPUs for jobs to use for Spark standalone cluster 
    job-number-cpus = 4 

    jobserver { 
    port = 8090 

    context-per-jvm = false 
    # Note: JobFileDAO is deprecated from v0.7.0 because of issues in 
    # production and will be removed in future, now defaults to H2 file. 
    jobdao = spark.jobserver.io.JobSqlDAO 

    filedao { 
     rootdir = /tmp/spark-jobserver/filedao/data 
    } 

    datadao { 
     # storage directory for files that are uploaded to the server 
     # via POST/data commands 
     rootdir = /tmp/spark-jobserver/upload 
    } 

    sqldao { 
     # Slick database driver, full classpath 
     slick-driver = slick.driver.H2Driver 

     # JDBC driver, full classpath 
     jdbc-driver = org.h2.Driver 

     # Directory where default H2 driver stores its data. Only needed for H2. 
     rootdir = /tmp/spark-jobserver/sqldao/data 

     # Full JDBC URL/init string, along with username and password. Sorry, needs to match above. 
     # Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass 
     jdbc { 
     url = "jdbc:h2:file:/tmp/spark-jobserver/sqldao/data/h2-db" 
     user = "" 
     password = "" 
     } 

     # DB connection pool settings 
     dbcp { 
     enabled = false 
     maxactive = 20 
     maxidle = 10 
     initialsize = 10 
     } 
    } 
    # When using chunked transfer encoding with scala Stream job results, this is the size of each chunk 
    result-chunk-size = 1m 
    } 

    # Predefined Spark contexts 
    # contexts { 
    # my-low-latency-context { 
    #  num-cpu-cores = 1   # Number of cores to allocate. Required. 
    #  memory-per-node = 512m   # Executor memory per node, -Xmx style eg 512m, 1G, etc. 
    # } 
    # # define additional contexts here 
    # } 

    # Universal context configuration. These settings can be overridden, see README.md 
    context-settings { 
    num-cpu-cores = 2   # Number of cores to allocate. Required. 
    memory-per-node = 2G   # Executor memory per node, -Xmx style eg 512m, #1G, etc. 

    # In case spark distribution should be accessed from HDFS (as opposed to being installed on every Mesos slave) 
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz" 

    # URIs of Jars to be loaded into the classpath for this context. 
    # Uris is a string list, or a string separated by commas ',' 
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"] 

    # Add settings you wish to pass directly to the sparkConf as-is such as Hadoop connection 
    # settings that don't use the "spark." prefix 
    passthrough { 
     #es.nodes = "192.1.1.1" 
    } 
    } 

    # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully 
    # home = "/home/spark/spark" 
} 

# Note that you can use this file to define settings not only for job server, 
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults. 
spray.can.server { 
    # uncomment the next lines for making this an HTTPS example 
    # ssl-encryption = on 
    # path to keystore 
    #keystore = "/some/path/sjs.jks" 
    #keystorePW = "changeit" 

    # see http://docs.oracle.com/javase/7/docs/technotes/guides/security/StandardNames.html#SSLContext for more examples 
    # typical are either SSL or TLS 
    encryptionType = "SSL" 
    keystoreType = "JKS" 
    # key manager factory provider 
    provider = "SunX509" 
    # ssl engine provider protocols 
    enabledProtocols = ["SSLv3", "TLSv1"] 
    idle-timeout = 60 s 
    request-timeout = 20 s 
    connecting-timeout = 5s 
    pipelining-limit = 2 # for maximum performance (prevents StopReading/ResumeReading messages to the IOBridge) 
    # Needed for HTTP/1.0 requests with missing Host headers 
    default-host-header = "spray.io:8765" 

    # Increase this in order to upload bigger job jars 
    parsing.max-content-length = 300m 
} 


akka { 
    remote.netty.tcp { 
    # This controls the maximum message size, including job results, that can be sent 
    # maximum-frame-size = 10 MiB 
    } 
} 

回答

1

我來到了類似的問題。如何解決這個問題有點棘手。首先,您需要將spark.jobserver.short-timeout添加到您的配置中。只需修改您的配置,如下所示:

jobserver { 
    port = 8090 

    context-per-jvm = false 
    short-timeout = 60s 
    ... 
} 

第二個(棘手)部分是您無法修改它,而無需修改spark-job-application的代碼。這導致超時屬性是BinaryManager類:

implicit val daoAskTimeout = Timeout(3 seconds) 

默認設置爲3其中第二個顯然是大罐子是不夠的。你可以把它增加到例如60秒,這對我來說是解決問題的方法。

implicit val daoAskTimeout = Timeout(60 seconds) 
+0

我試試看,它的工作原理謝謝 –

1

其實你可以很容易地降低罐子的大小。還有一些相關的罐子可以使用依賴jar-uris來傳遞,而不是組裝成一個大的罐子。