2015-01-12 139 views
1

我正在使用spark whith java,並且我想使用Spark Job-Server。爲此,我遵循了所有在此鏈接: https://github.com/spark-jobserver/spark-jobserverSpark Job Job Server with Java

這是我的項目中階等級:

import _root_.spark.jobserver.SparkJob 
import _root_.spark.jobserver.SparkJobValid 
import _root_.spark.jobserver.SparkJobValidation 
import com.typesafe.config._ 

import org.apache.spark._ 
import org.apache.spark.api.java.JavaSparkContext 
import spark.jobserver.{SparkJob, SparkJobValid, SparkJobValidation} 

object JavaWord extends SparkJob { 
    def main(args: Array[String]) { 
    val ctx = new SparkContext("local[4]", "JavaWordCount") 
    val config = ConfigFactory.parseString("") 

    val results = runJob(ctx, config) 
    } 

    override def validate(sc: SparkContext, config: Config): SparkJobValidation = { 
     SparkJobValid; 
    } 

    override def runJob(sc: SparkContext, config: Config): Any = { 
     val jsc = new JavaSparkContext(sc) 
     val j = new JavaCount() 
     return j.Mafonction(jsc: JavaSparkContext) 
    } 
    } 

和Java類「字wount」

import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import scala.Tuple2; 
import java.io.Serializable; 
import java.util.Arrays; 
import java.util.regex.Pattern; 



public final class JavaCount implements Serializable { 
public static Object main(String[] args) throws Exception { 

    return null; 
} 

public Object Mafonction(JavaSparkContext sc){ 
    String s= "a a a a b b c a"; 
    JavaPairRDD<String, Integer> lines = sc.parallelize(Arrays.asList(s.split(" "))).mapToPair(new PairFunction<String, String, Integer>() { 
     @Override 
     public Tuple2<String, Integer> call(String s) { 
      return new Tuple2<String, Integer>(s, 1); 
     } 
    }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
     @Override 
     public Integer call(Integer i1, Integer i2) { 
      return i1 + i2; 
     } 
    }); 
    return lines.collect(); 
} 
} 

但是,當我執行它我得到curl: (52) Empty reply from server此錯誤火花作業服務器:

> job-server[ERROR] Uncaught error from thread [JobServer-akka.actor.default-dispatcher-13]  shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[JobServer] 
job-server[ERROR] java.lang.IncompatibleClassChangeError: Implementing class 
job-server[ERROR] at java.lang.ClassLoader.defineClass1(Native Method) 
job-server[ERROR] at java.lang.ClassLoader.defineClass(ClassLoader.java:800) 
job-server[ERROR] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
job-server[ERROR] at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) 
job-server[ERROR] at java.net.URLClassLoader.access$100(URLClassLoader.java:71) 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method) 
job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 

job-server[ERROR] at  spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobMan agerActor.scala:222) 
job-server[ERROR] at  scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
job-server[ERROR] at  scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
job-server[ERROR] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42) 
job-server[ERROR] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
job-server[ERROR] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
job-server[ERROR] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
job-server[ERROR] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
job-server[ERROR] at  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
job-server ... finished with exit code 255 
+0

可以在重新啓動後重建項目嗎?看起來你部署了一個與舊版本不兼容的新版本。 –

+0

是的,我試圖重新啓動它,但我有同樣的錯誤。 –

+0

嗨,我解決了這個問題。我剛剛刪除了/ tmp/Spark-JobServe的所有競爭對手,並且我重新編譯了JobServer,它的工作原理^^非常感謝您的幫助 –

回答

2

我解決了這個問題。我剛剛刪除了/ tmp/Spark-JobServe的所有競爭對手,並且我重新編譯了JobServer,它的工作原理^^非常感謝您的幫助

+0

您是如何打包的? JavaCount是.scala還是.java? – chaosguru