2015-05-25 64 views
6

我有一個自定義數據源,我想將數據加載到我的Spark羣集中以執行一些計算。爲此,我看到我可能需要爲我的數據源實施新的RDD在Java中實現自定義Spark RDD

我是一個完整的Scala noob,我希望我可以在Java本身實現RDD。我環顧了互聯網,找不到任何資源。任何指針?

我的數據在S3中,並在Dynamo中編入索引。例如,如果我想要在給定時間範圍內加載數據,則首先需要查詢Dynamo以獲取相應時間範圍的S3文件密鑰,然後將它們加載到Spark中。這些文件可能並不總是具有相同的S3路徑前綴,因此sc.testFile("s3://directory_path/")將不起作用。

我正在尋找有關如何實現與HadoopRDDJdbcRDD類似的東西的指針,但使用Java。類似於他們在這裏完成的事情:DynamoDBRDD。這個從Dynamo讀取數據,我的自定義RDD將查詢DynamoDB的S3文件密鑰,然後從S3加載它們。

+1

一個'RDD'是一個相當柔性容器。你爲什麼認爲你需要重新實現它?你的數據的格式是什麼? – ohruunuruus

+0

我的數據在S3中,並在Dynamo中編入索引。例如,如果我想要在給定時間範圍內加載數據,則首先需要查詢Dynamo以獲取相應時間範圍的S3文件密鑰,然後將它們加載到Spark中。這些文件可能並不總是處於相同的S3路徑前綴中,因此'''sc.testFile(「s3:// directory_path /」)'''將不起作用。我正在尋找關於如何實現類似於HadoopRDD或JdbcRDD的指針,但使用Java。 –

+0

根據此:http://apache-spark-user-list.1001560.n3.nabble.com/is-there-any-easier-way-to-define-a-custom-RDD-in-Java-td6917 .html一年前是不可能的。不過,我很想知道是否有任何改變。 – tsiki

回答

1

一個選項是閱讀Hadoop規範,但是如果您的數據是結構化的Spark SQL有一個新的Data Sources API,某些實現發佈在Spark Packages上,包括avro,redshift和csv。

8

您可以在Java中擴展RDD並實現getPartitions和計算方法。

Java可以擴展Scala類,但有一些限制。

實施例:

package com.openmarket.danyal; 
// Other imports left out 
import org.apache.spark.Dependency; 
import org.apache.spark.Partition; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.rdd.RDD; 

import scala.collection.AbstractIterator; 
import scala.collection.Iterator; 
import scala.collection.mutable.ArrayBuffer; 
import scala.reflect.ClassManifestFactory$; 
import scala.reflect.ClassTag; 

public class AlphaTest { 
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class); 

    public static void main(final String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs"); 
     try(JavaSparkContext sc = new JavaSparkContext(conf)) { 
      System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect()); 
     } 
    } 

    public static class AlphabetRDD extends RDD<String> { 
     private static final long serialVersionUID = 1L; 

     public AlphabetRDD(SparkContext sc) { 
      super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG); 
     } 

     @Override 
     public Iterator<String> compute(Partition arg0, TaskContext arg1) { 
      AlphabetRangePartition p = (AlphabetRangePartition)arg0; 
      return new CharacterIterator(p.from, p.to); 
     } 

     @Override 
     public Partition[] getPartitions() { 
      return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')}; 
     } 

    } 

    /** 
    * A partition representing letters of the Alphabet between a range 
    */ 
    public static class AlphabetRangePartition implements Partition { 
     private static final long serialVersionUID = 1L; 
     private int index; 
     private char from; 
     private char to; 

     public AlphabetRangePartition(int index, char c, char d) { 
      this.index = index; 
      this.from = c; 
      this.to = d; 
     } 

     @Override 
     public int index() { 
      return index; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if(!(obj instanceof AlphabetRangePartition)) { 
       return false; 
      } 
      return ((AlphabetRangePartition)obj).index != index; 
     } 

     @Override 
     public int hashCode() { 
      return index(); 
     } 
    } 

    /** 
    * Iterators over all characters between two characters 
    */ 
    public static class CharacterIterator extends AbstractIterator<String> { 
     private char next; 
     private char last; 

     public CharacterIterator(char from, char to) { 
      next = from; 
      this.last = to; 
     } 

     @Override 
     public boolean hasNext() { 
      return next <= last; 
     } 

     @Override 
     public String next() { 
      // Post increments next after returning it 
      return Character.toString(next++); 
     } 
    } 
} 
+0

偉大的解決方案。一個問題:從0開始分區索引,否則這個自定義RDD上的'cartesian()'方法將導致ArrayOutOfBoundException異常。 '返回新分區[] {new AlphabetRangePartition(0,'A','M'),new AlphabetRangePartition(1,'P','Z')};' –