2016-11-14 118 views
2

我寫了代碼從「topicTest1」Kafka Queue獲取數據。我無法打印消費者的數據。發生以下提到的錯誤,卡夫卡流不在火花作業中工作

下面是我的代碼來使用數據,

public static void main(String[] args) throws Exception { 

     // StreamingExamples.setStreamingLogLevels(); 
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[*]"); 
     ; 
     // Create the context with 2 seconds batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(100)); 

     int numThreads = Integer.parseInt("3"); 
     Map<String, Integer> topicMap = new HashMap<>(); 
     String[] topics = "topicTest1".split(","); 
     for (String topic : topics) { 
      topicMap.put(topic, numThreads); 
     } 

     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "9.98.171.226:9092", "1", 
       topicMap); 

     messages.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

使用以下depedencies

<dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.6.1</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.10</artifactId> 
      <version>1.6.1</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>1.6.1</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-twitter_2.11</artifactId> 
      <version>1.6.1</version> 
     </dependency> 

下面的錯誤,我得到了

Exception in thread "dispatcher-event-loop-0" java.lang.NoSuchMethodError: scala/Predef$.$conforms()Lscala/Predef$$less$colon$less; (loaded from file:/C:/Users/Administrator/.m2/repository/org/scala-lang/scala-library/2.10.5/scala-library-2.10.5.jar by [email protected]) called from class org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy (loaded from file:/C:/Users/Administrator/.m2/repository/org/apache/spark/spark-streaming_2.11/1.6.2/spark-streaming_2.11-1.6.2.jar by [email protected]). 
     at org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy.scheduleReceivers(ReceiverSchedulingPolicy.scala:138) 
     at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receive$1.applyOrElse(ReceiverTracker.scala:450) 
     at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)16/11/14 13:38:00 INFO ForEachDStream: metadataCleanupDelay = -1 

     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) 
     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
     at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
     at java.lang.Thread.run(Thread.java:785) 

另一個錯誤

Exception in thread "JobGenerator" java.lang.NoSuchMethodError: scala/Predef$.$conforms()Lscala/Predef$$less$colon$less; (loaded from file:/C:/Users/Administrator/.m2/repository/org/scala-lang/scala-library/2.10.5/scala-library-2.10.5.jar by [email protected]) called from class org.apache.spark.streaming.scheduler.ReceivedBlockTracker (loaded from file:/C:/Users/Administrator/.m2/repository/org/apache/spark/spark-streaming_2.11/1.6.2/spark-streaming_2.11-1.6.2.jar by [email protected]). 
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:114) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:203) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
+0

這些樣的錯誤經常出現,當你用錯版本。檢查您安裝的是哪個火花版本以及您正在使用哪個火花庫版本 – lidox

+0

在問題 –

+0

@lidox中添加的依賴項和版本您是對的..版本「spark-streaming-kafka_2.10」和「spark-streaming_2」不匹配。 11「..我爲它們做了10個..它對我來說工作正常.. –

回答

2

確保您使用正確版本。比方說,您使用以下Maven dependecy:

<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.1</version> 
    </dependency> 

所以神器等於:火花流-kafka_2.10

現在,請檢查您是否使用了正確的卡夫卡版本:

cd /KAFKA_HOME/libs 

現在找到kafka_YOUR-VERSION-sources.jar。

如果你有kafka_2.10-0xxxx-sources.jar你很好! :) 如果您使用不同的版本,只需更改maven dependecies或下載正確的kafka版本。

之後,檢查你的Spark版本。請確保您使用了正確的版本

groupId: org.apache.spark artifactId: spark-core_2.xx version: xxx

+0

仍然留言不在日誌內打印。有什麼方法可以看到? –

+0

你能告訴這個問題更多嗎?日誌中沒有顯示哪條消息?你的意思是你的消費者沒有收到任何消息? – lidox

+0

我有一些使用Kafka和Apache Flink的代碼示例:https://github.com/lidox/big-data-fun/tree/2eb800725c894521a322ba7f1382491f69074e38/kafka-flink-101/src/main/java/com/artursworld。它應該是相同的原理 – lidox