星火產生多個小拼花文件。如何有效地處理生產者和消費者Spark作業中的少量地板文件。如何高效地讀取Spark的多個小地板文件?有沒有CombineParquetInputFormat?
0
A
回答
1
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
import java.io.IOException;
public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
}
private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {
public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
IOException, InterruptedException {
super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
}
}
}
在消費方請使用CombinedParquetInputFile:這將強制多個小文件從單個任務中讀取。
在生產者側: 用戶聚結(numFiles)以具有文件作爲輸出的足夠沒有。
如何使用customInputFileFormat火花和形式RDD和Dataframes:
JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
.values()
.map(p -> {
Row row = RowFactory.create(avroPojoToObjectArray((p));
return row;
});
sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);
//set max split size else only 1 task wil be spawned
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));
StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);
請參考http://bytepadding.com/big-data/spark/combineparquetfileinputformat/爲深入瞭解
1
最直接的方法恕我直言,是用再分配/ COALESCE(喜歡合併除非數據是歪斜的,你想編寫拼花文件,這樣你就不會創建小文件開始之前創建相同大小的輸出)。
df
.map(<some transformation>)
.filter(<some filter>)
///...
.coalesce(<number of partitions>)
.write
.parquet(<path>)
分區數可以根據數據幀中的總行數除以通過試錯法得到適當大小的因子來計算。
這是大多數大數據框架的最佳實踐,以幾個大文件preffer許多小文件(文件大小我通常使用的是100-500MB)
如果你已經在小文件中的數據,你想合併它,據我所知,你將不得不閱讀與Spark重新分區到更少的分區,並再次寫入。
相關問題
- 1. 有沒有辦法有效地同時讀取多個文件?
- 2. 在Spark中高效地讀取json
- 3. 從Spark中讀取地板數據時有多少個分區
- 4. 如何在Delphi中高效地讀取多個文件的第一行文件
- 5. PHP有效地讀取csv文件
- 6. Java:如何:有效地獲取2GB大小的文件大小?
- 7. 如何高效地讀取文本文件的最後一行
- 8. 如何在java中高效地讀取大文本文件
- 9. 如何高效地讀取C++文件的最後一行
- 10. Spark如何有效讀取100K圖像?
- 11. 如何有效地讀取和處理大文件?
- 12. 如何在R中有效地讀取大文件塊
- 13. 如何高效地用python讀取cap文件?
- 14. 如何在Spark中更有效地加載Parquet文件(pySpark v1.2.0)
- 15. 將多個csv文件有效地讀入熊貓數據框
- 16. 如何將文件(從Java讀取)最有效地傳遞給本地方法?
- 17. 有效地將文件讀入Pascal AnsiString
- 18. 高效地閱讀兩個文本框
- 19. С如何從有效的多個文件中讀取?
- 20. 從python的大文本文件中有效地讀取部分
- 21. 如何從一個大的txt文件中僅有效地讀取字符串
- 22. 如何將小地板文件與Spark結合?
- 23. 如何地理/有效地從讀取+尋求寫入數據?
- 24. 如何有效地搜索多個表
- 25. 高效地webpraping網站沒有api?
- 26. 高效地提取多個袋子
- 27. 如何非常有效地分析多個csv文件?
- 28. Spark - 讀取沒有文件擴展名的壓縮文件
- 29. 如何高效地將文件分發給多個客戶端?
- 30. Spark + Parquet + S3n:好像多次讀取實木複合地板文件
請聚結前添加一個GROUPBY和請遵守工作時間降解。這只是Producer對小文件的控制。消費者仍然可以強制閱讀大量的小拼花文件,在大多數情況下,生產商可能不會在你的控制和你也不希望合併數據,如果只需要讀取一次 – KrazyGautam
@KrazyGautam爲什麼GROUPBY?如果你不想彙總呢? 在我剛纔看到你的消費端解決方案的其他話題..看起來有趣的我不知道該選項的...順便說一句,你可能要合併減少,即使你只是讀取一次任務數的..有許多任務可能會有很大的開銷 –