2016-02-26 79 views
1

我希望持續保留Spark任務中的RDD,以便所有使用Spark Job Server的後續作業都可以使用它。這是我曾嘗試:在Spark Job Server中持久/共享RDD

工作1:

package spark.jobserver 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 

object FirstJob extends SparkJob with NamedRddSupport { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob") 
    val sc = new SparkContext(conf) 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + results) 
    } 

    override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid 

    override def runJob(sc: SparkContext, config: Config): Any = { 

    // the below variable is to be accessed by other jobs: 
    val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text")) 

    this.namedRdds.update("resultsRDD", to_be_persisted) 
    return to_be_persisted 
    } 
} 

工作2:

package spark.jobserver 

import com.typesafe.config.{Config, ConfigFactory} 
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import scala.util.Try 


object NextJob extends SparkJob with NamedRddSupport { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob") 
    val sc = new SparkContext(conf) 
    val config = ConfigFactory.parseString("") 
    val results = runJob(sc, config) 
    println("Result is " + results) 
    } 

    override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid 

    override def runJob(sc: SparkContext, config: Config): Any = { 

    val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get 
    rdd 
    } 
} 

我得到的錯誤是:

{ 
    "status": "ERROR", 
    "result": { 
    "message": "None.get", 
    "errorClass": "java.util.NoSuchElementException", 
    "stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"] 
    } 

請修改上面的代碼,以便to_be_persisted是可訪問的。 感謝

編輯:

產生火花背景下,編制和使用的包裝階源後:

curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m' 

調用FirstJob和NextJob使用:

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true' 

curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true' 
+0

如果您不得不從NamedRDD中受益,則必須在同一上下文中運行所有作業。你在做那個嗎? – noorul

+0

是的,我爲這兩個工作使用了相同的環境。 – vdep

回答

5

似乎有兩個問題在這裏:

  1. 如果您正在使用最新的火花jobserver版本(0.6.2 -SNAPSHOT),還有約NamedObjects工作不正常開放的bug - 似乎適合你的描述:https://github.com/spark-jobserver/spark-jobserver/issues/386

  2. 你也有很小的類型不匹配 - 在FirstJob你堅持一個RDD[String],並在NextJob你想獲取一個RDD[(String, String)] - 在NextJob,應閱讀val rdd = this.namedRdds.get[String]("resultsRDD").get)。

我已經試過你的代碼火花jobserver版本0.6.0,並與上面說小型修復,和它的作品。

+0

謝謝Tzach,它的工作原理。 (它也適用於spark 1.5.2和sjs 0.6.1) – vdep