2

我被告知我可以用Spark的一個版本創建一個Spark應用程序,只要我使用sbt assembly來構建它,比我可以在任何Spark集羣上使用spark-submit運行它。如何使用Spark 1.6在集羣上運行與Spark 2.1組裝的Spark應用程序?

所以,我用Spark 2.1.1構建了我的簡單應用程序。你可以在下面看到我的build.sbt文件。比我開始在我的羣集上:

cd spark-1.6.0-bin-hadoop2.6/bin/  
spark-submit --class App --master local[*] /home/oracle/spark_test/db-synchronizer.jar 

因此,正如你看到我用spark 1.6.0執行它。

和我得到的錯誤:

17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver] 
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J 
     at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70) 
     at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) 
     at scala.Option.orElse(Option.scala:257) 
     at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) 
     at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) 
     at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) 
     at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
     at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241) 
     at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater 
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     ... 1 more 
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts 
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater 
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) 
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated. 
     at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134) 
     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) 
     ... 1 more 

基礎上的一些閱讀中,我看到一般錯誤:java.lang.NoSuchMethodError連接到不同版本的星火。這可能是真的,因爲我使用不同的。但不應該sbt assembly覆蓋?請參閱下面的build.sbt和assembly.sbt文件

build.sbt

name := "spark-db-synchronizator" 

//Versions 
version := "1.0.0" 
scalaVersion := "2.10.6" 
val sparkVersion = "2.1.1" 
val sl4jVersion = "1.7.10" 
val log4jVersion = "1.2.17" 
val scalaTestVersion = "2.2.6" 
val scalaLoggingVersion = "3.5.0" 
val sparkTestingBaseVersion = "1.6.1_0.3.3" 
val jodaTimeVersion = "2.9.6" 
val jodaConvertVersion = "1.8.1" 
val jsonAssertVersion = "1.2.3" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "org.apache.spark" %% "spark-sql" % sparkVersion, 
    "org.apache.spark" %% "spark-hive" % sparkVersion, 
    "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion, 
    "org.apache.spark" %% "spark-streaming" % sparkVersion, 
    "org.slf4j" % "slf4j-api" % sl4jVersion, 
    "org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"), 
    "log4j" % "log4j" % log4jVersion % "provided", 
    "org.joda" % "joda-convert" % jodaConvertVersion, 
    "joda-time" % "joda-time" % jodaTimeVersion, 
    "org.scalatest" %% "scalatest" % scalaTestVersion % "test", 
    "com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test", 
    "org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test" 
) 

assemblyJarName in assembly := "db-synchronizer.jar" 

run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run)) 
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)) 

assemblyMergeStrategy in assembly := { 
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard 
    case x => MergeStrategy.first 
} 

// Spark does not support parallel tests and requires JVM fork 
parallelExecution in Test := false 

fork in Test := true 
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled") 

assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 
+1

謝謝@eliasah。時間一些清理...並刪除評論:)對不起... –

回答

3

你是正確的,它可能使用Spark 2.1.1庫運行Spark應用程序一些 Spark 1.6環境像Hadoop YARN(在CDH或HDP中)。


訣竅是相當頻繁的大公司用在基礎架構團隊的力量開發團隊使用一些舊版本的Spark只是因爲CDH(紗)或HDP(紗)不支持他們。


您應該使用​​從較新的星火安裝(我建議使用最新和最偉大2.1.1截至記者發稿)和捆綁所有星火瓶當你的星火應用程序的一部分。

只是sbt assembly星火2.1.1(如你在build.sbt指定)的星火應用和​​使用星火2.1.1的非常相同的版本較舊星火環境uberjar。

事實上,Hadoop YARN並沒有使Spark比任何其他應用程序庫或框架更好。對Spark非常不情願。

但是這需要一個集羣環境(並且只是在Spark應用程序使用Spark 2.1.1時檢查它不會與Spark Standalone 1.6一起使用)。

就你而言,當你使用local[*]主URL來啓動你的Spark應用程序時,它應該是而不是

cd spark-1.6.0-bin-hadoop2.6/bin/  
spark-submit --class App --master local[*] /home/oracle/spark_test/db-synchronizer.jar 

有兩個原因:

  1. local[*]是相當受約束的CLASSPATH,並試圖說服星火1.6.0在同一個JVM上運行星火2.1.1可能帶你相當長時間(如果可能的話)

  2. 您使用舊版本運行更多當前2.1.1。相反可以工作。

使用Hadoop YARN作爲......好吧......它沒有注意到Spark,並且已經在我的項目中測試了幾次。


I was wandering how can I know which version of i.e.spark-core is taken in runtime

使用web用戶界面,你會看到在你的左上角的版本。

enter image description here

你也應該諮詢網絡用戶界面的環境標籤,讓您找到運行時環境的配置。這是有關Spark應用程序託管環境的最權威來源。

enter image description here

接近底部,你應該看到類路徑項這應該給你罐子,文件和類CLASSPATH中。

enter image description here

用它來尋找任何CLASSPATH有關的問題。

+0

如果我錯了,糾正我,但這只是因爲YARN不使用Spark「二進制文件」,對不對? – zero323

+0

你知道任何方式如何檢查執行spark-submit時使用哪些二進制文件?我剛剛啓動了相同的jar: spark-submit --class net.atos.dbsynchronizator.App - 主管 - 部署模式客戶端/home/oracle/spark_test/db-synchronizer.jar 並得到相同的錯誤: java.lang.NoSuchMethodError:org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String; Ljava/lang/String;) – awenclaw

+1

我們看到的名字是這樣的:assemblyJarName in assembly:=「db-synchronizer。 jar「在build.sbt中。罐子有aprox 140MB,所以我認爲這是相當胖的罐子。 – awenclaw

相關問題