2017-03-29 35 views
0

我正在嘗試使用orc-core編寫獸人文件,以便以後通過配置單元讀取。Java - 空的獸人文件

正在寫入的文件具有正確的行數,但中沒有內容。我可以看到,兩個試圖讀取配置單元中的選擇查詢文件,並與hive --orcfiledump -d

我嘗試了指南中提供的示例,該示例中寫入了兩個long類型的列,並且生成的文件被配置單元正確讀取。我懷疑這是與我寫作而不是字符串列的事實有關,但我仍然無法使其工作。

這就是我目前如何寫入文件:

// File schema 
    String outputFormat = "struct<"; 
    for(int i=0;i<outputSchema.length;i++){ 
     outputFormat+=outputSchema[i]+":string,"; 
    } 
    outputFormat+="lastRecordHash:string,currentHash:string>"; 
    TypeDescription orcSchema = TypeDescription.fromString(outputFormat); 

    // Initializes buffers 
    VectorizedRowBatch batch = orcSchema.createRowBatch(); 
    ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2); 
    for(int i=0;i<numFields+2;i++){ 
     BytesColumnVector bcv = (BytesColumnVector) batch.cols[i]; 
     orcBuffers.add(i, bcv); 
    } 

    ... 

    // Initializes writer 
    Writer writer=null; 
    try{ 
     writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema)); 
     partitionCounter++; 
    } 
    catch(IOException e){ 
     log.error("Cannot open hdfs file. Reason: "+e.getMessage()); 
     session.transfer(flowfile, hdfsFailure); 
     return; 
    } 

    // Writes content 
    String[] records = ... 

    for(int i=0;i<records.length;i++){ 
     fields = records[i].split(fieldSeparator); 

     int row=batch.size++; 

     // Filling the orc buffers 
     for(int j=0;j<numFields;j++){ 
      orcBuffers.get(j).vector[row] = fields[j].getBytes(); 
      hashDigest.append(fields[j]); 
     } 
     if (batch.size == batch.getMaxSize()) { 
      try{ 
       writer.addRowBatch(batch); 
       batch.reset(); 
      } 
      catch(IOException e){ 
       log.error("Cannot write to hdfs. Reason: "+e.getMessage()); 
       return; 
      } 
     }   
    } 
    if (batch.size != 0) { 
     try{ 
      writer.addRowBatch(batch); 
      batch.reset(); 
     } 
     catch(IOException e){ 
      log.error("Cannot write to hdfs. Reason: "+e.getMessage()); 
      return; 
     } 
    } 
    writer.close(); 

任何建議或提供有益的參考是真正的讚賞。

謝謝大家。

回答

0

看起來像API深入審查的文檔是我需要的。我失蹤:

  • 呼叫initBuffer()每個BytesColumnVector在初始化階段
  • 指定調用setVal()列的值。這也可以使用setRef()來完成。它被證明是兩者中速度最快的,但我不知道如果適合我的具體情況,我會嘗試。

這是更新的代碼:

// File schema 
String outputFormat = "struct<"; 
for(int i=0;i<outputSchema.length;i++){ 
    outputFormat+=outputSchema[i]+":string,"; 
} 
outputFormat+="lastRecordHash:string,currentHash:string>"; 
TypeDescription orcSchema = TypeDescription.fromString(outputFormat); 

// Initializes buffers 
VectorizedRowBatch batch = orcSchema.createRowBatch(); 
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2); 
for(int i=0;i<numFields+2;i++){ 
    BytesColumnVector bcv = (BytesColumnVector) batch.cols[i]; 
    bcv.initBuffer(); 
    orcBuffers.add(i, bcv); 
} 

... 

// Initializes writer 
Writer writer=null; 
try{ 
    writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema)); 
    partitionCounter++; 
} 
catch(IOException e){ 
    log.error("Cannot open hdfs file. Reason: "+e.getMessage()); 
    session.transfer(flowfile, hdfsFailure); 
    return; 
} 

// Writes content 
String[] records = ... 

for(int i=0;i<records.length;i++){ 
    fields = records[i].split(fieldSeparator); 

    int row=batch.size++; 

    // Filling the orc buffers 
    for(int j=0;j<numFields;j++){ 
     orcBuffers.get(j).setVal(row, fields[j].getBytes()); 
     hashDigest.append(fields[j]); 
    } 
    if (batch.size == batch.getMaxSize()) { 
     try{ 
      writer.addRowBatch(batch); 
      batch.reset(); 
     } 
     catch(IOException e){ 
      log.error("Cannot write to hdfs. Reason: "+e.getMessage()); 
      return; 
     } 
    }   
} 
if (batch.size != 0) { 
    try{ 
     writer.addRowBatch(batch); 
     batch.reset(); 
    } 
    catch(IOException e){ 
     log.error("Cannot write to hdfs. Reason: "+e.getMessage()); 
     return; 
    } 
} 
writer.close();