2014-10-28 72 views
1

我有使用sparkCassandraConnector從Cassandra表中檢索數據的問題。我在卡桑德拉創建了名爲「ks」的名稱空間和表「學生」。該表如下:使用java中的Apache Spark連接器從Cassandra中檢索數據的錯誤

id |名稱

---- + -----------

10 |凱瑟琳

我開始在本地運行星火start-all.sh

然後我創造了這個類「SparkCassandraConnector」,它具有用於連接火花和Cassandra.What我試圖做一個命令是從獲取數據學生表並將其打印在屏幕上。

我得到的錯誤是「拋出java.lang.ClassNotFoundException:SparkCassandraConnector $學生 java.net.URLClassLoader的$ 1.run(URLClassLoader.java:372) java.net.URLClassLoader的$ 1.run(URLClassLoader.java:361 ) java.security.AccessController.doPrivileged(本機方法) java.net.URLClassLoader.findClass(URLClassLoader.java:360) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader中。 loadClass(ClassLoader.java:357) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:340)

這是我的計劃:

import org.apache.commons.lang.StringUtils; 

import org.apache.spark.SparkConf; 

import org.apache.spark.api.java.JavaRDD; 

import org.apache.spark.api.java.JavaSparkContext; 

import java.io.Serializable; 

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions; 

public class SparkCassandraConnector implements Serializable { 
public static void main(String[] args) { 

    SparkConf conf = new SparkConf().setAppName("Simple Application"); 

    conf.setMaster("spark://127.0.0.1:7077"); 
    conf.set("spark.cassandra.connection.host", "127.0.0.1"); 
    String[] jars = new String[10]; 
    jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar"; 
    jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar"; 
    jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar"; 
    jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar"; 
    jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar"; 
    jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar"; 
    conf = conf.setJars(jars); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class) 
      .map(new org.apache.spark.api.java.function.Function<Student, String>() { 
       @Override 
       public String call(Student person) throws Exception { 
        return person.toString(); 
       } 
      }); 
    System.out.println("Data as Person beans: \n" + StringUtils.join(rdd.collect(), "\n")); 
} 
public static class Student implements Serializable{ 

    private Integer id; 
    private String name; 

    public Student(){ 

    } 
    public Student(Integer id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    public Integer getId() { 
     return id; 
    } 

    public void setId(Integer id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 
} 

}

這是我的POM文件:

<dependencies> 


    <!--Spark--> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.1.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.1.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>2.1.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>1.1.0-alpha4</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
     <version>1.1.0-alpha4</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-catalyst_2.10</artifactId> 
     <version>1.0.0</version> 
    </dependency> 
</dependencies> 
+0

我建議你繼續使用程序集插件,你可以在這裏閱讀它的動機和示例:http://eugenezhulenev.com/blog/2014/10/18/run-tests-in-standalone-spark -cluster/ – 2014-10-28 19:34:58

+0

謝謝你的鏈接。但是你是否同意我提供了必要的jar文件,我不應該得到那個錯誤。 – sia 2014-10-28 19:48:43

回答

3

在提供的罐子,含有活的罐子,因此Student.class丟失。快速修復它以添加項目文件夾中的jar。

另一種方法是將工作和所有依賴關係打包在「超級jar」中,並將該超級jar用作唯一聲明的jar。看看maven shade plugin.

也可以從命令行使用spark-submit --jars選項提供罐。

+0

我試圖添加主類的jar(第一個建議的解決方案),但它不起作用。然後我嘗試使用spark-submit --jars,但是出現無法識別的-jar選項錯誤。我試着創建了Uber jar,並且這個解決方案有效。感謝@maasg – sia 2014-10-29 18:11:46

相關問題