回答

1
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 
import parquet.avro.AvroReadSupport; 
import parquet.hadoop.ParquetInputFormat; 

import java.io.IOException; 

public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> { 


    @Override 
    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext 
      context) throws IOException { 
     CombineFileSplit combineSplit = (CombineFileSplit) split; 
     return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class); 
    } 

    private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> { 


     public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws 
       IOException, InterruptedException { 
      super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx); 
     } 
    } 
} 

在消費方請使用CombinedParquetInputFile:這將強制多個小文件從單個任務中讀取。

在生產者側: 用戶聚結(numFiles)以具有文件作爲輸出的足夠沒有。

如何使用customInputFileFormat火花和形式RDD和Dataframes:

 JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration()) 
              .values() 
              .map(p -> { 
               Row row = RowFactory.create(avroPojoToObjectArray((p)); 
               return row; 
              }); 


    sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true); 


//set max split size else only 1 task wil be spawned  
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024)); 


    StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType(); 
      final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema); 

請參考http://bytepadding.com/big-data/spark/combineparquetfileinputformat/爲深入瞭解

1

最直接的方法恕我直言,是用再分配/ COALESCE(喜歡合併除非數據是歪斜的,你想編寫拼花文件,這樣你就不會創建小文件開始之前創建相同大小的輸出)。

df 
    .map(<some transformation>) 
    .filter(<some filter>) 
    ///... 
    .coalesce(<number of partitions>) 
    .write 
    .parquet(<path>) 

分區數可以根據數據幀中的總行數除以通過試錯法得到適當大小的因子來計算。

這是大多數大數據框架的最佳實踐,以幾個大文件preffer許多小文件(文件大小我通常使用的是100-500MB)

如果你已經在小文件中的數據,你想合併它,據我所知,你將不得不閱讀與Spark重新分區到更少的分區,並再次寫入。

+0

請聚結前添加一個GROUPBY和請遵守工作時間降解。這只是Producer對小文件的控制。消費者仍然可以強制閱讀大量的小拼花文件,在大多數情況下,生產商可能不會在你的控制和你也不希望合併數據,如果只需要讀取一次 – KrazyGautam

+0

@KrazyGautam爲什麼GROUPBY?如果你不想彙總呢? 在我剛纔看到你的消費端解決方案的其他話題..看起來有趣的我不知道該選項的...順便說一句,你可能要合併減少,即使你只是讀取一次任務數的..有許多任務可能會有很大的開銷 –

相關問題