1

我有一個在客戶端模式下工作的spark-streaming代碼:它從kafka讀取數據,執行一些處理,並使用spark-cassandra-connector將數據插入cassandra。Spark流不會將數據插入到Cassandra

當我使用「--deploy模式集羣」,數據不能插入,並且我得到以下錯誤:

Exception in thread "streaming-job-executor-53" java.lang.NoClassDefFoundError: com/datastax/spark/connector/ColumnSelector at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:94) at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:88) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

我加入扶養爲連接器是這樣的:

"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" % "provided"

這是我的應用程序代碼:

val measurements = KafkaUtils.createDirectStream[ 
    Array[Byte], 
    Array[Byte], 
    DefaultDecoder, 
    DefaultDecoder](ssc, kafkaConfig, Set("wattio" 
)) 
    .map { 
    case (k, v) => { 
     val decoder = new AvroDecoder[WattioMeasure](null, 
     WattioMeasure.SCHEMA$) 
     decoder.fromBytes(v) 
    } 
    } 

//inserting into WattioRaw 
WattioFunctions.run(WattioFunctions. 
    processWattioRaw(measurements))(
    (rdd: RDD[ 
    WattioTenantRaw], t: Time) => { 
    rdd.cache() 
    //get all the different tenants 
    val differentTenants = rdd.map(a 
    => a.tenant).distinct().collect() 
    // for each tenant, create keyspace value and flush to cassandra 
    differentTenants.foreach(tenant => { 
     val keyspace = tenant + "_readings" 
     rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw") 
    }) 
    rdd.unpersist(true) 
    } 
) 

ssc.checkpoint("/tmp") 
ssc.start() 
ssc.awaitTermination() 
+0

如何在運行時指定連接器依賴關係?你的全面啓動命令是什麼? – RussS

回答

1

你需要確保你的JAR是提供給工人。一旦執行作業開始,Spark主將打開一個文件服務器。

您需要通過使用SparkContext.setJars或通過--jars標誌傳遞給​​來指定您的超級罐子的路徑。

From the documentation

When using spark-submit, the application jar along with any jars included with the --jars option will be automatically transferred to the cluster. Spark uses the following URL scheme to allow different strategies for disseminating jars

0

其實我解決它由扶養列表中刪除「提供」,使SBT包裝火花卡桑德拉 - 連接到我的組裝罐子。

有趣的是,在我的啓動腳本,甚至當我tryed使用

spark-submit --repositories "location of my artifactory repository" --packages "spark-cassandra-connector"

spark-submit --jars "spark-cassandra-connector.jar"

他們都失敗了!

0

範圍提供手段,你所期望的JDK或容器,以提供在運行時的依賴,並特別依賴罐子不會是你最終的應用程序戰爭/罐要創建因此是錯誤的一部分。

相關問題