2016-09-01 77 views
2

我使用spark spark'org.apache.spark:spark-streaming_2.10:1.6.1'和'org.apache.spark:spark-streaming-kafka_2.10 :1.6.1'連接到一個kafka經紀人版本0.10.0.1。當我嘗試此代碼:Spark streaming kafka找不到集合的領導者偏移

def messages = KafkaUtils.createDirectStream(jssc, 
      String.class, 
      String.class, 
      StringDecoder.class, 
      StringDecoder.class, 
      kafkaParams, 
      topicsSet) 

我已經收到此異常:

INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException 
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException 
org.apache.spark.SparkException: Couldn't find leader offsets for Set([stream,0]) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at scala.util.Either.fold(Either.scala:97) 
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) 
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) 
    at org.apache.spark.streaming.kafka.KafkaUtils$createDirectStream.call(Unknown Source) 
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45) 
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108) 
    at com.privowny.classification.jobs.StreamingClassification.main(StreamingClassification.groovy:48) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

我嘗試搜索這個網站的一些答案,但似乎沒有答案,你可以給我一些建議做什麼?主題stream不是空的。

+0

這通常是ZooKeeper問題的一個信號。重置ZooKeeper並重試。 –

+0

可能是什麼問題?我剛剛啓動了服務器,就像在快速入門文檔中一樣! – innovatism

+1

我遇到了Kafka和ZooKeeper之間存在同步問題的問題。重置他們都解決了它。 –

回答

1

我從經驗中知道,如果Spark驅動程序無法使用經紀人公佈的主機名(advertised.host.nameserver.properties)到達kafka經紀人,那麼可能導致此錯誤消息的一件事是。即使spark配置識別使用不同地址的kafka代理,情況也是如此。所有代理公佈的主機名都必須可以從Spark驅動程序中獲得。

這發生在我身上,因爲集羣運行在單獨的AWS賬戶中,經紀人使用內部DNS記錄識別自己,這些記錄必須複製到其他AWS賬戶。在這之前,我得到了這個錯誤信息,因爲即使我們在spark配置中使用經紀人的私有IP地址,Spark驅動程序也無法聯繫到經紀人詢問他們最新的抵消額。

希望能幫助別人。

2

我也遇到過這個問題。所以你必須改變卡夫卡的一些配置。

轉到您的Kafka配置並配置listeners;

在格式套接字服務器設置部分:

listeners=PLAINTEXT://[hostname or IP]:[port] 

例如:

listeners=PLAINTEXT://192.168.1.24:9092 
0

我跑卡夫卡從HDP,所以默認端口是6667,而不是9092,當我將bootstrap.servers的端口切換爲<hostname>:6667,問題得到解決。

相關問題