2017-01-18 64 views
0

我在S3中使用Spark 2使用Java API將一些遺留數據轉換爲parquet格式。在Spark中將數據轉換爲Parquet

我有Avro模式(.avsc文件)和他們使用Avro編譯器生成的Java類,我想用Parquet格式存儲使用這些模式的數據。輸入數據不是任何標準格式,但我有一個庫,可以將每行從傳統文件轉換爲Avro類。

是否可以將數據讀取爲JavaRDD<String>,將轉換應用到使用庫的Avro類,最後將其存儲爲實木複合格式。

喜歡的東西:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");  
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));  
converted.saveAsParquet("s3://bucket/destination"); //how do I do this 

是像上面是否可行?我稍後想要使用Hive,Presto以及Spark來處理已轉換的實木複合地板數據。

+0

搜索星火峯會PRES。由史蒂夫Loughran(霍頓)關於「對象商店」... –

+0

@SamsonScharfrichter不回答我的問題。我看到的唯一與遠程相關的東西是他如何將一些csv數據轉換爲Parquet。他使用sparkSession.csv()調用來加載我無法使用的數據,因爲我需要使用自定義的反序列化器。 –

+0

那麼,你的**實際**問題是什麼?是否將自定義的'JavaRDD '轉換爲常規的DataFrame?關於將您的自定義材料保存爲Parquet格式?關於將其保存到S3對象存儲?關於使用不知道RDD是什麼的另一種工具讀取自定義內容的方法?以上的組合? –

回答

1

現在忽略S3;這是一個生產細節。你需要從更簡單的問題開始「將我的格式的本地文件轉換爲標準文件」。這是您可以在本地通過針對小數據集樣本集進行單元測試來實現的功能。

這通常是Spark作爲Hadoop的MapReduce的一樣:實施InputFormat<K, V>FileInputFormat<K, V>一個子類,或者使用Hadoop的org.apache.hadoop.streaming.mapreduce.StreamInputFormat輸入格式,實現自己的RecordReader,則該選項spark.hadoop.stream.recordreader.class設置爲您記錄閱讀器的類名(可能最簡單的)。

有很多這方面的文檔,以及堆棧溢出問題。源樹本身也有很多例子。

0

想通了,除了Hadoop的輸入和輸出格式中已經存在由史蒂夫提到的基本方法,:

  Job job = new Job(); 
     ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class); 
     AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$); 
     AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024); 
     AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); 
     AvroParquetOutputFormat.setCompressOutput(job, true); 

     sparkContext.textFile("s3://bucket/path_to_legacy_files") 
      .map(line -> customLib.convertToAvro(line)) 
      .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record)) 
      .saveAsNewAPIHadoopFile(
       "s3://bucket/destination", 
       Void.class, 
       MyAvroType.class, 
       new ParquetOutputFormat<MyAvroType>().getClass(), 
       job.getConfiguration()); 
相關問題