2015-11-15 51 views
2

通過Rfc4180CsvParser導入數據時,是否有排除標題行的方法? COPY命令具有SKIP選項,但在使用Vertica SDK中提供的CSV分析器時,該選項似乎不起作用。使用Rfc4180CsvParser將CSV導入到Vertica並排除標題行

背景

作爲背景,COPY命令本身不讀的CSV文件。對於簡單的CSV文件,可以說COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"';,但是這將失敗,數據文件中包含帶有嵌入引號的字符串值。

添加ESCAPE AS '"'將產生錯誤​​3210。這是一個問題,因爲CSV值被封閉並被"轉義。

Vertica的SDK CsvParser擴展救援

Vertica的/opt/vertica/sdk/examples下提供了一個SDK與可編譯成擴展的C++程序。其中之一是/opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp

這個偉大的工程如下:

cd /opt/vertica/sdk/examples 
make clean 
vsql 
==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so'; 
==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser(); 

問題

除了它導入數據文件的第一行作爲行上述的偉大工程COPY命令有一個SKIP 1選項,但這不適用於解析器。

問題

是否possble編輯Rfc4180CsvParser.cpp跳過第一行,或更好,但需要一定的參數指定的行數跳過?

該方案只是135線,但我沒有看到在哪裏/如何做這個切口。提示?

複製下面,我沒有看到一個公開回購鏈接到整個程序...

Rfc4180CsvParser.cpp

/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */ 

#include "Vertica.h" 
#include "StringParsers.h" 
#include "csv.h" 

using namespace Vertica; 

// Note, the class template is mostly for demonstration purposes, 
// so that the same class can use each of two string-parsers. 
// Custom parsers can also just pick a string-parser to use. 

/** 
* A parser that parses something approximating the "official" CSV format 
* as defined in IETF RFC-4180: <http://tools.ietf.org/html/rfc4180> 
* Oddly enough, many "CSV" files don't actually conform to this standard 
* for one reason or another. But for sources that do, this parser should 
* be able to handle the data. 
* Note that the CSV format does not specify how to handle different 
* data types; it is entirely a string-based format. 
* So we just use standard parsers based on the corresponding column type. 
*/ 
template <class StringParsersImpl> 
class LibCSVParser : public UDParser { 
public: 
    LibCSVParser() : colNum(0) {} 

    // Keep a copy of the information about each column. 
    // Note that Vertica doesn't let us safely keep a reference to 
    // the internal copy of this data structure that it shows us. 
    // But keeping a copy is fine. 
    SizedColumnTypes colInfo; 

    // An instance of the class containing the methods that we're 
    // using to parse strings to the various relevant data types 
    StringParsersImpl sp; 

    /// Current column index 
    size_t colNum; 

    /// Parsing state for libcsv 
    struct csv_parser parser; 

    // Format strings 
    std::vector<std::string> formatStrings; 

    /** 
    * Given a field in string form (a pointer to the first character and 
    * a length), submit that field to Vertica. 
    * `colNum` is the column number from the input file; how many fields 
    * it is into the current record. 
    */ 
    bool handleField(size_t colNum, char* start, size_t len) { 
     if (colNum >= colInfo.getColumnCount()) { 
      // Ignore column overflow 
      return false; 
     } 
     // Empty colums are null. 
     if (len==0) { 
      writer->setNull(colNum); 
      return true; 
     } else { 
      return parseStringToType(start, len, colNum, colInfo.getColumnType(c 
olNum), writer, sp); 
     } 
    } 

    static void handle_record(void *data, size_t len, void *p) { 
     static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p) 
->colNum++, (char*)data, len); 
    } 

    static void handle_end_of_row(int c, void *p) { 
     // Ignore 'c' (the terminating character); trust that it's correct 
     static_cast<LibCSVParser*>(p)->colNum = 0; 
     static_cast<LibCSVParser*>(p)->writer->next(); 
    } 

    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input 
, InputState input_state) { 
     size_t processed; 
     while ((processed = csv_parse(&parser, input.buf + input.offset, input.s 
ize - input.offset, 
       handle_record, handle_end_of_row, this)) > 0) { 
      input.offset += processed; 
     } 

     if (input_state == END_OF_FILE && input.size == input.offset) { 
      csv_fini(&parser, handle_record, handle_end_of_row, this); 
      return DONE; 
     } 

     return INPUT_NEEDED; 
    } 

    virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy 
pe); 
    virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return 
Type) { 
     csv_free(&parser); 
    } 
}; 

template <class StringParsersImpl> 
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized 
ColumnTypes &returnType) { 
    csv_init(&parser, CSV_APPEND_NULL); 
    colInfo = returnType; 
} 

template <> 
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface, 
SizedColumnTypes &returnType) { 
    csv_init(&parser, CSV_APPEND_NULL); 
    colInfo = returnType; 
    if (formatStrings.size() != returnType.getColumnCount()) { 
     formatStrings.resize(returnType.getColumnCount(), ""); 
    } 
    sp.setFormats(formatStrings); 
} 

template <class StringParsersImpl> 
class LibCSVParserFactoryTmpl : public ParserFactory { 
public: 
    virtual void plan(ServerInterface &srvInterface, 
      PerColumnParamReader &perColumnParamReader, 
      PlanContext &planCtxt) {} 

    virtual UDParser* prepare(ServerInterface &srvInterface, 
      PerColumnParamReader &perColumnParamReader, 
      PlanContext &planCtxt, 
      const SizedColumnTypes &returnType) 
    { 
     return vt_createFuncObj(srvInterface.allocator, 
       LibCSVParser<StringParsersImpl>); 
    } 
}; 

typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory; 
RegisterFactory(LibCSVParserFactory); 

typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac 
tory; 
RegisterFactory(FormattedLibCSVParserFactory); 

回答

2

的快速和骯髒的方法是隻是硬編碼它。它使用回調handle_end_of_row。跟蹤行號,只是不處理第一行。喜歡的東西:

static void handle_end_of_row(int c, void *ptr) { 
    // Ignore 'c' (the terminating character); trust that it's correct 
    LibCSVParser *p = static_cast<LibCSVParser*>(ptr); 
    p->colNum = 0; 

    if (rowcnt <= 0) { 
     p->bad_field = ""; 
     rowcnt++; 
    } else if (p->bad_field.empty()) { 
     p->writer->next(); 
    } else { 
     // libcsv doesn't give us the whole row to reject. 
     // So just write to the log. 
     // TODO: Come up with something more clever. 
     if (p->currSrvInterface) { 
      p->currSrvInterface->log("Invalid CSV field value: '%s' Row skipped.", 
            p->bad_field.c_str()); 
     } 
     p->bad_field = ""; 
    } 
} 

另外,最好在process初始化rownum = 0,因爲我認爲它會調用這個在你COPY語句中的每個文件。可能有更聰明的方法來做到這一點。基本上,這隻會處理記錄,然後丟棄它。一般情況下支持SKIP ...請參閱TraditionalCSVParser瞭解如何處理參數傳遞。您必須將其添加到解析器因子prepare並將值發送到LibCSVParser類並覆蓋getParameterType。然後在LibCSVParser中,您需要接受構造函數中的參數,並修改process以跳過第一個skip行。然後使用該值而不是上面的硬編碼0

+0

一個快速評論,我一定會使用多個文件來測試這個。當涉及多個文件時,文檔並不清楚輸入緩衝區是如何工作的。我會嘗試1個文件,2個文件,然後n + 1個文件,其中n =節點數。我會嘗試這個有和沒有'任何節點'。 – woot

+0

這太棒了,謝謝! – prototype

+0

@ user645715 NP。讓我知道它是如何解決你的。正如我在我的評論中提到的那樣,我對輸入緩衝區的工作原理並不積極,但如果結果不是我認爲的那樣工作,另一個想法可能是創建一個UDSource。 – woot