2015-11-06 309 views
2

要卡桑德拉添加到火花流火花流+卡桑德拉

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-streaming" % sparkVersion, 
    "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion, 
    "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.8", 
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2") 

準備罐子

assemblyMergeStrategy in assembly := { 
    case PathList("com", "esotericsoftware", [email protected]_*) => MergeStrategy.last 
    case PathList("com", "google", [email protected]_*) => MergeStrategy.first 
    case PathList("org", "apache", [email protected]_*) => MergeStrategy.last 
    case PathList("io", "netty", [email protected]_*) => MergeStrategy.last 
    case PathList("com", "codahale", [email protected]_*) => MergeStrategy.last 
    case PathList("io.netty", "netty-all", [email protected]_*) => MergeStrategy.last 
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first 

    case x => 
    val oldStrategy = (assemblyMergeStrategy in assembly).value 
    oldStrategy(x) 
} 

代碼

val sparkConf = new SparkConf(true) 
     .setMaster("local[2]") 
     .setAppName(getClass.getSimpleName) 
     .set("spark.executor.memory", "1g") 
     .set("spark.cores.max", "1") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    /** Creates the keyspace and table in Cassandra. */ 
    CassandraConnector(sparkConf).withSessionDo { session => 
     session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming") 
     session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") 
     session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)") 
     session.execute(s"TRUNCATE kafka_streaming.wordcount") 
    } 

有例外:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z 
     at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142) 
     at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136) 
     at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609) 
     at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606) 
     at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147) 
     at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259) 
     at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135) 
     at com.datastax.driver.core.Cluster.<init>(Cluster.java:111) 
     at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178) 
     at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152) 
     at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85) 
     at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) 
     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150) 
     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150) 
     at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 
     at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 
     at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) 
     at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) 
     at com.lowcosttravelgroup.PostCostsStreamingApp$.cassandraTest(PostCostsStreamingApp.scala:71) 
     at com.lowcosttravelgroup.PostCostsStreamingApp$.main(PostCostsStreamingApp.scala:46) 
     at com.lowcosttravelgroup.PostCostsStreamingApp.main(PostCostsStreamingApp.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null} 
15/11/06 17:56:31 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null} 

如果用卡桑德拉排除操作,火花開始時無任何例外

任何想法?任何工程github的例子spark spark + cassandra?

BR!

+0

斯卡拉版本是2.11 – scalaz

+0

問題是谷歌番石榴被使用的版本。嘗試明確地將其添加到您的sbt依賴項。 – maasg

+0

試過,火花1.5.1的哪個版本? – scalaz

回答

0

NoSuchMethodError通常表示scala版本不匹配。例如,如果你火花是建立與斯卡拉2.10,替換

"com.datastax.spark" % "spark-cassandra-connector-java_2.10" % "1.5.0-M2" 

你的連接器的依賴性您可以找到其他版本Here

+0

scala版本是2.11 – scalaz

+0

使用2.11版本的連接器 –

+0

你的意思是spark-cassandra-connector-java_2.11? – scalaz

0

你應該通過你的軟件包參數與UR火花提交或火花殼下載火花卡桑德拉連接器

--packages datastax:火花卡桑德拉連接器:1.5.0-M2-s_2.10

請通過正確的連接器版本,以使您的代碼能夠工作 並在sbt文件依賴關係後提供的寫入 「com.datastax.spark」%%「spark-cassandra-connector」%「1.5.0-M2」%provided

希望這會有幫助

相關問題