我跟着@xhochy's advice在數據到達時使用箭頭API填充箭頭表,然後使用parquet-cpp
的WriteTable()
方法寫出表格。我將GZIP設置爲默認壓縮,但爲第二個字段指定了SNAPPY。上述
#include <iostream>
#include "arrow/builder.h"
#include "arrow/table.h"
#include "arrow/io/file.h"
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>
main() {
arrow::Int32Builder sip_builder(arrow::default_memory_pool());
arrow::Int32Builder dip_builder(arrow::default_memory_pool());
for(size_t i=0; i < 1000; i++) { // simulate row-oriented incoming data
sip_builder.Append(i*100);
dip_builder.Append(i*10 + i);
}
std::shared_ptr<arrow::Array> sip_array;
sip_builder.Finish(&sip_array);
std::shared_ptr<arrow::Array> dip_array;
dip_builder.Finish(&dip_array);
std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
arrow::field("dip", arrow::int32(), false)
};
auto schema = std::make_shared<arrow::Schema>(schema_definition);
std::shared_ptr<arrow::Table> arrow_table;
MakeTable(schema, {sip_array, dip_array}, &arrow_table);
std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
parquet::WriterProperties::Builder props_builder;
props_builder.compression(parquet::Compression::GZIP);
props_builder.compression("dip", parquet::Compression::SNAPPY);
auto props = props_builder.build();
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
file_output_stream, sip_array->length(), props);
std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1 <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 99900, Min: 0
Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 10989, Min: 0
Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 5316
的代碼寫出一個行組整個表/文件。根據您擁有多少行數據,這可能並不理想,因爲太多的行可能導致「回退到純編碼」(請參閱Ryan Blue presentation,幻燈片31-34)。要寫出每表/文件中的多個行組,設置chunk_size
參數較小(低於我除以2,得到每表/文件中的兩個組):
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
fileOutputStream, sip_array->length()/2, props);
這仍不理想。在調用parquet::arrow::WriteTable()
之前,文件的所有數據都必須緩衝/存儲在箭頭表中,因爲該函數打開和關閉文件。我想爲每個文件寫入多個行組,但我只想在內存中一次緩存/存儲一個或兩個有價值數據的行組。以下代碼完成了這一點。它是基於斷碼的parquet/arrow/writer.cc:
#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(),
arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(),
&writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3 <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
Rows: 1000000---