2017-03-02 181 views
2

我試圖從我的kafka製作者發送消息並在spark spark中進行流式處理。但是當我在火花提交中運行我的應用程序時,出現以下錯誤。Spark + Kafka streaming NoClassDefFoundError kafka/serializer/StringDecoder

錯誤

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder 
     at com.spark_stream.Main.main(Main.java:37) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
     ... 10 more 

應用代碼如下:

Main.java

package com.spark_stream; 

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.serializer.StringDecoder; 

public class Main { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 

     System.out.println("spark started!"); 

      SparkConf conf = new SparkConf() 
        .setAppName("kafka-sandbox") 
        .setMaster("local[*]"); 
      JavaSparkContext sc = new JavaSparkContext(conf); 
      JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 


      Map<String, String> kafkaParams = new HashMap<String, String>(); 
      kafkaParams.put("metadata.broker.list", "localhost:9092"); 
      Set<String> topics = Collections.singleton("speed"); 

      JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
        String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

      directKafkaStream.foreachRDD(rdd -> { 
       System.out.println("--- New RDD with " + rdd.partitions().size() 
         + " partitions and " + rdd.count() + " records"); 
       rdd.foreach(record -> System.out.println(record._2)); 
      }); 

      System.out.println("connection completed"); 


      ssc.start(); 

      ssc.awaitTermination(); 

      System.out.println("spark ended!"); 

    } 

} 

的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>com.spark_stream</groupId> 
    <artifactId>com.spark_stream</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 


    <dependencies> 

    <dependency> <!-- Spark dependency --> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 


</dependencies> 

    <properties> 
     <maven.compiler.source>1.8</maven.compiler.source> 
     <maven.compiler.target>1.8</maven.compiler.target> 
    </properties> 
</project> 

找不到此錯誤的解決方案。任何幫助,將不勝感激。

回答

2

看一看商務部:http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit

更具體的部分:

路徑捆綁的罐子,包括你的應用程序和所有的依賴。

鑑於你的pom.xml清楚地表明你正在構建的jar沒有依賴關係。這就是爲什麼spark-submit無法找到類kafka.serializer.StringDecoder。

您可能需要使用來解決這樣的問題是什麼是一個插件,包括你的罐子裏面你的依賴,在maven assembly插件可以幫助您完成此

+0

謝謝,添加maven assembly插件做的工作。 –

2

好像編譯器無法找到卡夫卡瓶當你有不包含在pom文件中。 嘗試在您的pom文件中添加以下依賴項。檢查您正在使用的kafka版本。

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 --> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.0</version> 
</dependency> 
+0

添加此依賴關係後會出現相同的錯誤 –

相關問題