2016-07-09 7 views
1

我試圖從卡夫卡流數據引發kafka.cluster.BrokerEndPoint不能轉換爲kafka.cluster.Broker

我使用的火花1.6.2與卡夫卡0.9.0.1和Scala 2.11.8當我使用基於接收器的方法

一切工作正常(KafkaUtils.createStream()) 但是當我嘗試不接收這樣的

val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, 
    Map("group.id" -> "blah", 
    "auto.offset.reset" -> "smallest", 
    "metadata.broker.list" -> "127.0.0.1:9092", 
    "bootstrap.servers"-> "127.0.0.1:9092"), 
    Set("tweets") 
) 

我得到這個錯誤直接的方法

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) 
at scala.Option.map(Option.scala:146) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) 
at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) 
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) 
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) 
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155) 
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213) 
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) 
at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
at SparkStreaming$.delayedEndpoint$SparkStreaming$1(SparkStreaming.scala:32) 
at SparkStreaming$delayedInit$body.apply(SparkStreaming.scala:24) 
at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
at scala.App$$anonfun$main$1.apply(App.scala:76) 
at scala.App$$anonfun$main$1.apply(App.scala:76) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
at scala.App$class.main(App.scala:76) 
at SparkStreaming$.main(SparkStreaming.scala:24) 
at SparkStreaming.main(SparkStreaming.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 

,這些都是我的依賴

"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2", 
"org.apache.spark" %% "spark-core" % "1.6.2", 
"org.apache.spark" % "spark-streaming_2.11" % "1.6.2", 
"org.apache.kafka" %% "kafka" % "0.9.0.1" 

我看不出問題出在哪裏?任何人都可以幫助我嗎?

回答

6

根據Spark Streaming文檔here,Spark Streaming 1.6.2與Kakfa 0.8.2.1兼容。

卡夫卡:星火流1.6.2與卡夫卡0.8.2.1兼容

所以要解決的0.8.2.1版本,而不是0.9.0.1您的問題用卡夫卡庫。

希望這會有所幫助!

+0

非常感謝您的回答。我只是想知道如果使用0.9.2.1代理的0.8.2.1庫會導致任何兼容性問題?! –

+0

卡夫卡0.8.2.1庫與卡夫卡0.9.0.1兼容,所以不會有任何問題。 – avr

+0

再次確定thanx。抱歉沒有提醒你回答。我還沒有足夠的聲望:D –

相關問題