2017-05-24 55 views
0

我正在嘗試使用用於Kafka-Spark集成的SSL。我已經測試了卡夫卡啓用了SSL,它與樣本消費者和生產者完全合作。用Kafka-Spark集成找不到集合([topic,0])的領導者

而且,我已經試過星火整合 - 卡夫卡沒有火花工作 SSL完成時也沒有工作的問題。

現在,當我使SSL火花工作我得到一個異常和集成不起作用。

ONLY變化我沒有讓SSL火花工作是包括在我的工作,下面的代碼行:

sparkConf.set("security.protocol", "SSL"); 
    sparkConf.set("ssl.truststore.location", "PATH/truststore.jks"); 
    sparkConf.set("ssl.truststore.password", "passwrd"); 
    sparkConf.set("ssl.keystore.location", "PATH/keystore.jks"); 
    sparkConf.set("ssl.keystore.password", "kstore"); 
    sparkConf.set("ssl.key.password", "keypass"); 

和創建時,這個sparkConf傳遞流媒體上下文。

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); 

當我運行工作我得到的錯誤如下:

17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0]) 
java.lang.NullPointerException 
    at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312) 
    at kafka.cluster.Broker.connectionString(Broker.scala:62) 
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) 
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89) 
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 

卡夫卡版本 - 2.11-0.10.2.0
星火版本 - 2.1.0
斯卡拉版 - 2.11.8

流圖書館

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

與問候克服這個問題的任何幫助嗎?

回答

0

隨着一些挖掘我能夠找出我有問題。

首先,爲了使SSL的SSL相關,卡夫卡PARAMS需要傳遞到KafkaUtils.createDirectStream()方法和JavaStreamingContextsparkConf

然後,給出SSL參數

"security.protocol", "SSL" 
"ssl.truststore.location", "PATH/truststore.jks" 
"ssl.truststore.password", "passwrd" 
"ssl.keystore.location", "PATH/keystore.jks" 
"ssl.keystore.password", "kstore" 
"ssl.key.password", "keypass" 

不被火花卡夫卡流版 「0- _2.11」,支持我正在使用,因此我必須將其更改爲版本「0- _2.11」。

作爲回報,該方法有一個完整的API更改:KafkaUtils.createDirectStream()用於連接到Kafka。

在有關如何使用它的文檔中給出瞭解釋here

所以我最後的代碼段連接到卡夫卡是這樣的:

final JavaInputDStream<ConsumerRecord<String, String>> stream = 
      KafkaUtils.createDirectStream(
        javaStreamingContext, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams) 
      ); 

卡夫卡PARAMS是一個地圖包含所有的SSL參數。

謝謝
Shabir

相關問題