2017-06-21 57 views
2

我現有的項目使用Hadoop map-reduce生成具有XML格式的自定義鍵和值的序列文件。將純文本文件轉換爲Spark中的Hadoop序列文件

通過從輸入源一次讀取一行來生成XML值,並且實現RecordReader以從純文本返回XML格式的下一個值。

例如輸入源文件有3行(第一行是頭,並具有實際的數據休息行)

id|name|value 
1|Vijay|1000 
2|Gaurav|2000 
3|Ashok|3000 

發佈地圖的方法的序列文件具有如下數據:

FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>1</id><name>Vijay</name><value>1000</value></bars> 
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>2</id><name>Gaurav</name><value>2000</value></bars> 
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>3</id><name>Ashok</name><value>3000</value></bars> 

問題:我想在Spark中實現相同。基本上,讀取輸入文件並如上所述生成鍵值對。

是否有任何方式/可能重用現有的InputFormat,並因此重用我的Hadoop映射器類中使用的RecordReader。

RecordReader負責/有邏輯將純文本行轉換爲XML並返回值作爲Hadoop映射方法用於寫入context.write()方法。

請提出建議。

回答

3

這包含在External Datasets部分的Spark文檔中。對你最重要的部分是:

對於其他的Hadoop InputFormats,您可以使用 JavaSparkContext.hadoopRDD方法,它接受一個任意JobConf 和輸入格式類,重點類和價值類。將這些與您的輸入源一起使用的Hadoop作業的方式設置爲相同的 。您還可以使用基於 「新」MapReduce API(org.apache.hadoop.mapreduce)的InputFormats的JavaSparkContext.newAPIHadoopRDD 。

這裏有一個簡單的例子demostrating如何使用它:

public final class ExampleSpark { 

    public static void main(String[] args) throws Exception { 
     JavaSparkContext spark = new JavaSparkContext(); 
     Configuration jobConf = new Configuration(); 

     JavaPairRDD<LongWritable, Text> inputRDD = spark.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, jobConf); 
     System.out.println(inputRDD.count()); 

     spark.stop(); 
     System.exit(0); 
    } 
} 

你可以看到的Javadoc JavaSparkContext here