2015-05-06 32 views
1

我對Spark和Cassandra都很新,需要一些指導。我正在設置一個使用Spark v1.3.1和Cassandra v2.0.14的maven項目。我正在嘗試以下操作:Spark-Cassandra Maven項目,Java源碼製作scala-lib調用

1)使用以下方法與Oracle DB建立連接以進行數據輸入;利用DataFrames新增至Spark 1.3.0:http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/

2)使用spark-cassandra-connector在後者之間建立連接;在github上找到。

3)一旦我有在數據幀的DB數據我應該能夠轉換爲JavaRDD類型並推至卡桑德拉密鑰空間如所示在這裏:http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java

4)總之:[Oracle數據庫] < --- [火花] --- [火花卡桑德拉連接器] ---> [卡桑德拉]

期間(步驟1由以上而來)在我的Java代碼Scala的-LIB呼叫時遇到的問題就來了;更具體地說,在加載函數調用期間:DataFrame jdbcDF = sqlContext.load(「jdbc」,options);

運行時錯誤: 拋出java.lang.ClassNotFoundException:scala.collection.GenTraversableOnce $類」

上述錯誤來,儘管在我的pom.xml文件中已經嘗試了幾種不同版本的建議2.10.X斯卡拉的。從我以前的研究中,我認爲這可能是一個Spark-Scala兼容性問題。我也讀過,我需要在我的類路徑中包含scala-lib.jar,但我不確定如何用maven做到這一點。任何這方面的想法?我已經包含pom.xml中及以下Java代碼:

的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
 
    <modelVersion>4.0.0</modelVersion> 
 
    <groupId>com.dev</groupId> 
 
    <artifactId>spark-cassandra</artifactId> 
 
    <version>0.0.1-SPARK-CASSANDRA</version> 
 

 
    <dependencies> 
 
     <dependency> 
 
      <groupId>org.apache.spark</groupId> 
 
      <artifactId>spark-core_2.11</artifactId> 
 
      <version>1.3.1</version> 
 
     </dependency> 
 

 
     <dependency> 
 
      <groupId>org.apache.spark</groupId> 
 
      <artifactId>spark-sql_2.11</artifactId> 
 
      <version>1.3.1</version> 
 
     </dependency> 
 

 
     <dependency> 
 
      <groupId>mysql</groupId> 
 
      <artifactId>mysql-connector-java</artifactId> 
 
      <version>5.1.35</version> 
 
     </dependency> 
 
\t <dependency> 
 
\t  <groupId>com.oracle</groupId> 
 
\t  <artifactId>ojdbc6</artifactId> 
 
\t  <version>11.2.0</version> 
 
\t </dependency> 
 
\t 
 
\t 
 
\t <dependency> 
 
\t  <groupId>com.datastax.spark</groupId> 
 
\t  <artifactId>spark-cassandra-connector_2.10</artifactId> 
 
\t  <version>1.0.0-rc4</version> 
 
\t </dependency> 
 
\t <dependency> 
 
\t  <groupId>com.datastax.spark</groupId> 
 
\t  <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
 
\t  <version>1.0.0-rc4</version> 
 
\t </dependency> 
 
\t <dependency> 
 
\t  <groupId>com.datastax.cassandra</groupId> 
 
\t  <artifactId>cassandra-driver-core</artifactId> 
 
\t  <version>2.1.5</version> 
 
\t </dependency> 
 
\t 
 
\t <dependency> 
 
\t  <groupId>org.apache.spark</groupId> 
 
\t  <artifactId>spark-streaming_2.10</artifactId> 
 
\t  <version>1.3.1</version> 
 
\t </dependency> 
 
\t <dependency> 
 
\t  <groupId>com.dev.cassandra</groupId> 
 
\t  <artifactId>spark-cassandra</artifactId> 
 
\t  <version>1.0</version> 
 
\t </dependency> 
 
\t 
 
\t <dependency> \t 
 
\t  <groupId>org.scala-lang</groupId> 
 
\t  <artifactId>scala-library</artifactId> 
 
\t  <version>2.10.3</version> 
 
\t </dependency> 
 
\t 
 
\t <dependency> 
 
\t <groupId>org.scala-lang</groupId> 
 
\t <artifactId>scala-compiler</artifactId> 
 
\t <version>2.10.3</version> 
 
\t </dependency> 
 
\t <!-- 
 
\t <dependency> 
 
\t <groupId>org.scala-lang</groupId> 
 
\t <artifactId>scala-reflect</artifactId> 
 
\t <version>2.10.0-M1</version> 
 
\t </dependency> 
 
\t --> 
 

 
\t \t 
 
\t <!-- 
 
\t 
 
\t <dependency> 
 
\t  <groupId>org.scala-lang</groupId> 
 
\t  <artifactId>scala-swing</artifactId> 
 
\t  <version>2.10.0-M1</version> 
 
\t </dependency> 
 
\t --> 
 
\t 
 
    </dependencies> 
 
    
 

 
    <build> 
 
     <pluginManagement> 
 
\t 
 
      <plugins> 
 
       <plugin> 
 
        <groupId>net.alchim31.maven</groupId> 
 
        <artifactId>scala-maven-plugin</artifactId> 
 
        <version>3.1.5</version> 
 
       </plugin> 
 
\t \t <plugin> 
 
       <groupId>org.apache.maven.plugins</groupId> 
 
       <artifactId>maven-compiler-plugin</artifactId> 
 
       <version>3.3</version> 
 
       <configuration> 
 
        <source>1.7</source> 
 
        <target>1.7</target> 
 
\t \t  <mainClass>com.dev.cassandra.Main</mainClass> 
 
\t \t  <cleanupDaemonThreads>false</cleanupDaemonThreads> 
 
        <compilerArgument>-Xlint:all</compilerArgument> 
 
        <showWarnings>true</showWarnings> 
 
        <showDeprecation>true</showDeprecation> 
 
       </configuration> 
 
\t  </plugin> 
 
      </plugins> 
 
     </pluginManagement> 
 

 
     <plugins> 
 

 
      <plugin> 
 
       <groupId>net.alchim31.maven</groupId> 
 
       <artifactId>scala-maven-plugin</artifactId> 
 
       <executions> 
 
        <execution> 
 
         <id>scala-compile-first</id> 
 
         <phase>process-resources</phase> 
 
         <goals> 
 
          <goal>add-source</goal> 
 
          <goal>compile</goal> 
 
         </goals> 
 
        </execution> 
 
        <execution> 
 
         <id>scala-test-compile</id> 
 
         <phase>process-test-resources</phase> 
 
         <goals> 
 
          <goal>testCompile</goal> 
 
         </goals> 
 
        </execution> 
 
       </executions> 
 
      </plugin> 
 

 
      <!-- Plugin to create a single jar that includes all dependencies 
 
      <plugin> 
 
\t \t <groupId>org.apache.maven.plugins</groupId> 
 
       <artifactId>maven-assembly-plugin</artifactId> 
 
       <version>2.4</version> 
 
       <configuration> 
 
        <descriptorRefs> 
 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
 
        </descriptorRefs> 
 
        <archive> 
 
         <manifest> 
 
          <mainClass>com.dev.cassandra.Main</mainClass> 
 
         </manifest> 
 
        </archive> 
 
       </configuration> 
 
       <executions> 
 
        <execution> 
 
\t \t \t <id>make-assembly</id> 
 
         <phase>package</phase> 
 
         <goals> 
 
          <goal>single</goal> 
 
         </goals> 
 
        </execution> 
 
       </executions> 
 
      </plugin> 
 
\t \t --> 
 
     </plugins> 
 
    </build> \t \t \t \t 
 

 
</project>

Java代碼:

package com.dev.cassandra; 
 

 
import java.io.Serializable; 
 
import java.util.HashMap; 
 
import java.util.List; 
 
import java.util.Map; 
 

 
import java.sql.*; 
 

 
import org.apache.spark.*; 
 
import org.apache.spark.SparkConf; 
 
import org.apache.spark.api.java.*; \t 
 
import org.apache.spark.api.java.JavaSparkContext; 
 
import org.apache.spark.sql.DataFrame; 
 
import org.apache.spark.sql.Row; 
 
import org.apache.spark.sql.SQLContext; 
 
import org.apache.spark.sql.types.DataTypes; 
 
import org.apache.spark.sql.types.StructField; 
 
import org.apache.spark.sql.types.StructType; 
 

 
import oracle.jdbc.*; 
 

 
import com.datastax.spark.connector.cql.CassandraConnector; 
 
import static com.datastax.spark.connector.CassandraJavaUtil.*; 
 

 
public class Main implements Serializable { 
 

 
    private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class); 
 

 
    private static final String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver"; 
 
    private static final String JDBC_USERNAME = "XXXXXO01"; 
 
    private static final String JDBC_PWD = "XXXXXO01"; 
 
    private static final String JDBC_CONNECTION_URL = 
 
      "jdbc:oracle:thin:" + JDBC_USERNAME + "/" + JDBC_PWD + "@CONNECTION VALUES"; 
 

 
    private transient SparkConf conf; 
 
    
 
    private Main(SparkConf conf) { 
 
     this.conf = conf; 
 
    } 
 
    
 
    private void run() { 
 
     JavaSparkContext sc = new JavaSparkContext(conf); 
 
     SQLContext sqlContext = new SQLContext(sc); 
 
     generateData(sc); 
 
     compute(sc); 
 
     showResults(sc); 
 
     sc.stop(); 
 
    } 
 
    
 
    private void generateData(JavaSparkContext sc) { 
 
    
 
     SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 
 
     System.out.println("AFTER SQL CONTEXT"); 
 
     
 
     //Data source options 
 
     Map<String, String> options = new HashMap<>(); 
 
     options.put("driver", JDBC_DRIVER); 
 
     options.put("url", JDBC_CONNECTION_URL); 
 
     options.put("dbtable","(SELECT * FROM XXX_SAMPLE_TABLE WHERE ROWNUM <=5)"); 
 
     
 
     CassandraConnector connector = CassandraConnector.apply(sc.getConf()); 
 
     
 
     try{ \t 
 
\t Class.forName(JDBC_DRIVER); 
 
\t 
 
\t System.out.println("BEFORE jdbcDF"); 
 
\t 
 
     //Load JDBC query result as DataFrame 
 
     DataFrame jdbcDF = sqlContext.load("jdbc", options); 
 
     System.out.println("AFTER jdbcDF"); 
 

 
     List<Row> tableRows = jdbcDF.collectAsList(); 
 
     
 
     System.out.println("AFTER tableRows"); 
 

 
     for (Row tableRow : tableRows) { 
 
\t  System.out.println(); 
 
      LOGGER.info(tableRow); 
 
      System.out.println(); 
 
     } 
 
     
 
\t }catch(Exception e){ 
 
\t //Handle errors for Class.forName 
 
\t e.printStackTrace(); 
 
\t } 
 
    } 
 
    
 
    private void compute(JavaSparkContext sc) { 
 
    } 
 
    
 
    private void showResults(JavaSparkContext sc) { 
 
    } 
 
    
 
    
 
    public static void main(String[] args) throws InterruptedException 
 
    { 
 
\t 
 
     if (args.length != 2) { 
 
      System.err.println("Syntax: com.datastax.spark.dev.cassandra <Spark Master URL> <Cassandra contact point>"); 
 
      System.exit(1); 
 
     } 
 
    
 
\t //JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
 
\t SparkConf conf = new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"); 
 

 
\t 
 
\t //SparkConf conf = new SparkConf(); 
 
     //conf.setAppName("SparkJdbcDs"); 
 
     //conf.setMaster(args[0]); 
 
     //conf.set("spark.cassandra.connection.host", args[1]); 
 
    
 
     Main app = new Main(conf); 
 
     app.run(); 
 
\t 
 
    } 
 
}

提前感謝!

+1

Spark-Cassandra連接器不支持Spark 1.3.x - 請參閱兼容性表:https://github.com/datastax/spark-cassandra-connector – maasg

+0

此外,您的pom.xml正在請求Scala 2.11版本的Spark JAR和Cassandra JAR的Scala 2.10版本。 (基於Scala Artifact ID的命名約定,以Scala的版本結尾,您希望它們爲其構建)。這些需要彼此保持一致(a),(b)使用您實際使用的Scala版本。 –

+0

@SpiroMichaylov - 非常感謝這項建議;我的錯誤,就像你提到的,是我在我的pom.xml中調用了錯誤的Scala版本(2.11)。在我的Spark依賴項中對版本2.10進行更改之後,它工作正常。謝謝! – madStack007

回答

2

pom.xml中請求的一些星火JAR文件

<artifactId>spark-core_2.11</artifactId> 

<artifactId>spark-sql_2.11</artifactId> 

和另一星火JAR和卡桑德拉連接器的JAR的斯卡拉2.10版本的Scala的2.11版本。

<artifactId>spark-streaming_2.10</artifactId> 

<artifactId>spark-cassandra-connector_2.10</artifactId> 

<artifactId>spark-cassandra-connector-java_2.10</artifactId> 

(基於斯卡拉神器的ID,這與你希望他們建造了斯卡拉版本結束的命名約定。)

這些需要是一致的(一)相互和(b )與您實際使用的Scala版本。

+0

好的漁獲非常感謝 – mithra