2014-06-27 73 views
4

我想將一個用Scala編寫的例子(從Apache Spark項目)移植到Java中,並且遇到一些問題。如何使用Spark的.newAPIHadoopRDD()來自Java

從原來的Scala例子的代碼

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
    classOf[CqlPagingInputFormat], 
    classOf[java.util.Map[String,ByteBuffer]], 
    classOf[java.util.Map[String,ByteBuffer]]) 

構建並運行得很好,但

JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(), 
    CqlPagingInputFormat.class, 
    java.util.Map<String, ByteBuffer>.class, 
    java.util.Map<String, ByteBuffer>.class); 

在爪哇(Cannot select from parameterized type)是不允許的。

更改

java.util.Map<String, ByteBuffer>.class 

Class.forName("java.util.Map<String, ByteBuffer>") 

產生一個新的錯誤:

Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?> 

它改變成簡單java.util.Map.class產生類似的錯誤:

Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map> 

那麼正確的翻譯是什麼?值得注意的是,newAPIHadoopRDD()函數是Scala和Java的不同實現。有關這些方法的文檔可以在以下位置找到:here,這裏是:http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class,java.lang.Class,java.lang.Class)。

CqlPagingInputFormat聲明看起來是這樣的

public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> { 
+0

您是否嘗試過'java.util.Map.class'而不是'java.util.Map .class'? –

+0

是的,我應該補充一點。我將在原始問題中發佈錯誤,謝謝。 – martingms

回答

2

最後我得到它經過多次戰鬥解決。 問題是newHadoopAPI需要一個擴展org.apache.hadoop.mapreduce.InputFormat和org.apache.cassandra.hadoop.cql3.CqlInputFormat的類不直接擴展InputFormat,而是擴展org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat,它在轉向擴展InputFormat。

Eclipse使用了足夠聰明的groovy編譯器來解決這個問題,但是Java的默認編譯器無法解決這個問題。另外,Groovy編譯器正確解析了K編譯器發現的K,V值不兼容的問題。

您需要添加以下更改pom.xml文件使用Groovy編譯器:

<properties> 
    <groovy-version>1.8.6</groovy-version> 
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version> 
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version> 
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version> 
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version> 
</properties> 
  1. 添加時髦的依賴

    <dependencies> 
        <dependency> 
         <groupId>org.codehaus.groovy</groupId> 
         <artifactId>groovy-all</artifactId> 
         <version>${groovy-version}</version> 
        </dependency> 
    <dependencies> 
    
  2. 添加grovvy插件下構建中使用它作爲我們的代碼的編譯器

    <build> 
        <pluginManagement> 
         <plugins> 
         <plugin> 
          <groupId>org.apache.maven.plugins</groupId> 
          <artifactId>maven-compiler-plugin</artifactId> 
          <version>${maven-comipler-plugin-version}</version> 
          <configuration> 
           <!-- Bind Groovy Eclipse Compiler --> 
           <compilerId>groovy-eclipse-compiler</compilerId> 
           <source>${jdk-version}</source> 
           <target>${jdk-version}</target> 
          </configuration> 
          <dependencies> 
           <!-- Define which Groovy version will be used for build (default is 
            2.0) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-batch</artifactId> 
            <version>${groovy-eclipse-batch-version}</version> 
           </dependency> 
           <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
            in compilerId) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-compiler</artifactId> 
            <version>${groovy-eclipse-compiler-version}</version> 
           </dependency> 
          </dependencies> 
         </plugin> 
         <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
          to this, plugin will --> 
         <!-- enhance default build life cycle with an extra phase which adds 
          additional Groovy source folders --> 
         <!-- It works fine under Maven 3.x, but we've encountered problems with 
          Maven 2.x --> 
         <plugin> 
          <groupId>org.codehaus.groovy</groupId> 
          <artifactId>groovy-eclipse-compiler</artifactId> 
          <version>${groovy-eclipse-compiler-version}</version> 
          <extensions>true</extensions> 
         </plugin> 
         <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
          to any execution phase, --> 
         <!-- so you'll have to call Clover goals from command line. --> 
         <plugin> 
          <groupId>com.atlassian.maven.plugins</groupId> 
          <artifactId>maven-clover2-plugin</artifactId> 
          <version>${maven-clover2-plugin-version}</version> 
          <configuration> 
           <generateHtml>true</generateHtml> 
           <historyDir>.cloverhistory</historyDir> 
          </configuration> 
         </plugin> 
         </plugins> 
        </pluginManagement> 
    </build> 
    

這應該解決它。

+1

不錯!我最終使用了Datastax自己的[cassandra-driver-spark](https:// github。com/datastax/cassandra-driver-spark)(來自Scala,並將其封裝在Java中),它具有更好的API(Java支持即將推出)。很高興看到你明白了。 – martingms

+0

嗨,當我使用JavaSparkContext.newAPIHadoopRDD(conf,XXInputFormat.class,NullWritable.class,Map.class)時,我遇到了一個問題,我使用XXInputFormat直接擴展mapreduce.InputFormat並使用Map 作爲InputFormat參數。 – tobe

+0

錯誤:\t sc.newAPIHadoopRDD(conf,TestInputFormat.class,NullWritable.class,Map.class); \t ^^^^^^^^^^^^^^^ 束縛不匹配:類型JavaSparkContext的一般方法newAPIHadoopRDD(配置,類,類,類)是不適用的參數(配置,類,Class ,類)。推斷類型TestInputFormat不是有界參數的有效替代方法> – tobe

相關問題