2017-08-08 67 views
0

我基本上面向行/流數據(Netflow)進入我的C++應用程序,我想將數據寫入Parquet-gzip文件。如何使用Parquet-cpp無緩衝地編寫流/面向行的數據?

看着在地板-CPP項目的sample reader-writer.cc program,看來我只能養活數據拼花CPP以柱狀方式:

constexpr int NUM_ROWS_PER_ROW_GROUP = 500; 
... 
// Append a RowGroup with a specific number of rows. 
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP); 

// Write the Bool column 
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { 
    bool_writer->WriteBatch(1, nullptr, nullptr, &value); 
} 
// Write the Int32 column 
... 
// Write the ... column 

這似乎意味着,我需要緩存NUM_ROWS_PER_ROW_GROUP將我自己排成行,然後遍歷它們,一次將它們轉移到parquet-cpp一列。我希望有一個更好的方法,因爲這看起來效率低下,因爲數據需要被複制兩次:一次進入我的緩衝區,然後再一次將數據送入parquet-cpp一列。

有沒有辦法讓每行數據變成parquet-cpp而不必先緩衝一堆行? Apache Arrow項目(parquet-cpp使用)有a tutorial that shows how to convert row-wise data into an Arrow table。對於輸入數據的每一行,代碼附加到每一列建設者:

for (const data_row& row : rows) { 
    ARROW_RETURN_NOT_OK(id_builder.Append(row.id)); 
    ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost)); 

我願做這樣的事情有鑲木-CPP。那可能嗎?

回答

2

因爲我們需要從行方式轉換爲列方式,所以您永遠不會有任何緩衝。在撰寫本文時,最好的路徑是構建Apache箭頭表,然後將其輸入到parquet-cpp

parquet-cpp提供特殊的Arrow API,然後可以直接在這些表上操作,大多數情況下不需要任何額外的數據副本。你可以在parquet/arrow/reader.hparquet/arrow/writer.h找到API。

最佳,但尚未實施的解決方案可以節省做一些字節如下:

  • 攝取一行一行地在新的鑲木-CPP API
  • 直接編碼每列這些值與指定的編碼和壓縮設置
  • 只在內存
  • 在行組的端部緩衝器此,柱後寫出柱

儘管這個最佳解決方案可能爲您節省了一些內存,但仍然有一些步驟需要由某人來實現(隨意提供它們或請求實施這些步驟的幫助),您可能對使用基於Apache Arrow的API 。

0

我跟着@xhochy's advice在數據到達時使用箭頭API填充箭頭表,然後使用parquet-cppWriteTable()方法寫出表格。我將GZIP設置爲默認壓縮,但爲第二個字段指定了SNAPPY。上述

#include <iostream> 
#include "arrow/builder.h" 
#include "arrow/table.h" 
#include "arrow/io/file.h" 
#include <parquet/arrow/writer.h> 
#include <parquet/properties.h> 

main() { 
    arrow::Int32Builder sip_builder(arrow::default_memory_pool()); 
    arrow::Int32Builder dip_builder(arrow::default_memory_pool()); 
    for(size_t i=0; i < 1000; i++) { // simulate row-oriented incoming data 
     sip_builder.Append(i*100); 
     dip_builder.Append(i*10 + i); 
    } 
    std::shared_ptr<arrow::Array> sip_array; 
    sip_builder.Finish(&sip_array); 
    std::shared_ptr<arrow::Array> dip_array; 
    dip_builder.Finish(&dip_array); 
    std::vector<std::shared_ptr<arrow::Field>> schema_definition = { 
     arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */), 
     arrow::field("dip", arrow::int32(), false) 
    }; 
    auto schema = std::make_shared<arrow::Schema>(schema_definition); 
    std::shared_ptr<arrow::Table> arrow_table; 
    MakeTable(schema, {sip_array, dip_array}, &arrow_table); 

    std::shared_ptr<arrow::io::FileOutputStream> file_output_stream; 
    arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream); 
    parquet::WriterProperties::Builder props_builder; 
    props_builder.compression(parquet::Compression::GZIP); 
    props_builder.compression("dip", parquet::Compression::SNAPPY); 
    auto props = props_builder.build(); 
    parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(), 
     file_output_stream, sip_array->length(), props); 
    std::cout << "done" << std::endl; 
} 
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out 
done 
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet 
File Name: test.parquet 
Version: 0 
Created By: parquet-cpp version 1.2.1-SNAPSHOT 
Total rows: 1000 
Number of RowGroups: 1   <<---------- 
Number of Real Columns: 2 
Number of Columns: 2 
Number of Selected Columns: 2 
Column 0: sip (INT32) 
Column 1: dip (INT32) 
--- Row Group 0 --- 
--- Total Bytes 8425 --- 
    Rows: 1000--- 
Column 0 
, Values: 1000, Null Values: 0, Distinct Values: 0 
    Max: 99900, Min: 0 
    Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE 
    Uncompressed Size: 5306, Compressed Size: 3109 
Column 1 
, Values: 1000, Null Values: 0, Distinct Values: 0 
    Max: 10989, Min: 0 
    Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE 
    Uncompressed Size: 5306, Compressed Size: 5316 

的代碼寫出一個行組整個表/文件。根據您擁有多少行數據,這可能並不理想,因爲太多的行可能導致「回退到純編碼」(請參閱​​Ryan Blue presentation,幻燈片31-34)。要寫出每表/文件中的多個行組,設置chunk_size參數較小(低於我除以2,得到每表/文件中的兩個組):

parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(), 
     fileOutputStream, sip_array->length()/2, props); 

這仍不理想。在調用parquet::arrow::WriteTable()之前,文件的所有數據都必須緩衝/存儲在箭頭表中,因爲該函數打開和關閉文件。我想爲每個文件寫入多個行組,但我只想在內存中一次緩存/存儲一個或兩個有價值數據的行組。以下代碼完成了這一點。它是基於斷碼的parquet/arrow/writer.cc

#include <parquet/util/memory.h> 
... 
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream); 
std::unique_ptr<parquet::arrow::FileWriter> writer; 
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(), 
    arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(), 
    &writer); 
// write two row groups for the first table 
writer->WriteTable(*arrow_table, sip_array->length()/2); 
// ... code here would generate a new table ... 
// for now, we'll just write out the same table again, to 
// simulate writing more data to the same file, this 
// time as one row group 
writer->WriteTable(*arrow_table, sip_array->length()); 
writer->Close(); 
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet      File Name: test.parquet 
Version: 0 
Created By: parquet-cpp version 1.2.1-SNAPSHOT 
Total rows: 2000000 
Number of RowGroups: 3 <<-------- 
... 
--- Row Group 0 --- 
--- Total Bytes 2627115 --- 
    Rows: 500000--- 
... 
--- Row Group 1 --- 
--- Total Bytes 2626873 --- 
    Rows: 500000--- 
... 
--- Row Group 2 --- 
--- Total Bytes 4176371 --- 
    Rows: 1000000---