2017-02-17 64 views
3

我已經爲BucketingSink創建了一個編寫器。匯和作家工作沒有錯誤,但當涉及到寫作avro通用錄製到實木複合地板的作家,該文件是從進行中創建,待完成。但是這些文件是空的,有0個字節。任何人都可以告訴我代碼有什麼問題嗎?我試過在open()方法中放置AvroParquetWriter的初始化,但結果仍然相同。Flink BucketingSink與自定義AvroParquetWriter創建空文件

當調試代碼,我確認writer.write(元件)不執行與元件包含阿夫羅genericrecord數據

流數據

BucketingSink<DataEventRecord> sink = 
    new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/"); 

sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm")); 
sink.setWriter(new ParquetSinkWriter<DataEventRecord>()); 

ParquetSinkWriter

import java.io.File; 
import java.io.IOException; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.flink.streaming.connectors.fs.StreamWriterBase; 
import org.apache.flink.streaming.connectors.fs.Writer; 
import org.apache.parquet.avro.AvroParquetWriter; 
import org.apache.parquet.hadoop.ParquetWriter; 
import org.apache.parquet.hadoop.metadata.CompressionCodecName; 
import com.any.DataEventRecord; 

public class ParquetSinkWriter<T> extends StreamWriterBase<T> { 

    private transient ParquetWriter<GenericRecord> writer; 

    private Path path; 
    private FileSystem fs; 
    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; 
    private final int blockSize = 256 * 1024 * 1024; 
    private final int pageSize = 64 * 1024; 


    @Override 
    // workaround 
    public void open(FileSystem fs, Path path) throws IOException { 
    super.open(fs, path); 
    this.path = path; 
    this.fs = fs; 
    } 

    @Override 
    public void write(T event) throws IOException { 
    DataEventRecord element = (DataEventRecord) event; 

    if (writer == null) { 
     writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize); 
    } 

    if (writer != null) { 
     GenericRecord datum = element.getRecord(); 
     writer.write(datum); 
    } 
    } 

    @Override 
    public void close() throws IOException { 
    if (writer != null) { 
     writer.close(); 
    } 
    super.close(); 
    } 

    @Override 
    public Writer<T> duplicate() { 
    return new ParquetSinkWriter<T>(); 
    } 

} 
+0

我設法解決了這個問題。在寫入過程中,在調用super.open(fs,path)的同時創建AvroParquetWRiter實例時出現問題。打開的事件已經創建了一個文件,作者也試圖創建相同的文件,但無法執行,因爲文件已經存在。因此,當Avro編寫器無法寫入已存在的文件時,總會有0條記錄寫入文件。刪除super.open將導致基類因「Writer未打開」而失敗。我最終基於BucketingSink擴展自己的接收器類,現在一切正常。 – jlim

+0

請問您如何解決它,請顯示一些參考代碼?我也堅持同樣的問題 – neoeahit

+0

你不能簡單地實現'Writer'接口而不是使用'StreamWriterBase'嗎? 'StreamWriterBase'打開一個你不需要的文件'FSDataOutputStream'。 –

回答

0
直接

實施Writer應該看起來像

import org.apache.flink.util.Preconditions; 

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.parquet.avro.AvroParquetWriter; 
import org.apache.parquet.hadoop.ParquetWriter; 
import org.apache.parquet.hadoop.metadata.CompressionCodecName; 

import java.io.IOException; 

/** 
* Parquet writer. 
* 
* @param <T> 
*/ 
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> { 

    private static final long serialVersionUID = -975302556515811398L; 

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; 
    private final int pageSize = 64 * 1024; 

    private final String schemaRepresentation; 

    private transient Schema schema; 
    private transient ParquetWriter<GenericRecord> writer; 
    private transient Path path; 

    private int position; 

    public ParquetSinkWriter(String schemaRepresentation) { 
     this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation); 
    } 

    @Override 
    public void open(FileSystem fs, Path path) throws IOException { 
     this.position = 0; 
     this.path = path; 

     if (writer != null) { 
      writer.close(); 
     } 

     writer = createWriter(); 
    } 

    @Override 
    public long flush() throws IOException { 
     Preconditions.checkNotNull(writer); 
     position += writer.getDataSize(); 
     writer.close(); 
     writer = createWriter(); 

     return position; 
    } 

    @Override 
    public long getPos() throws IOException { 
     Preconditions.checkNotNull(writer); 
     return position + writer.getDataSize(); 
    } 

    @Override 
    public void close() throws IOException { 
     if (writer != null) { 
      writer.close(); 
      writer = null; 
     } 
    } 

    @Override 
    public void write(T element) throws IOException { 
     Preconditions.checkNotNull(writer); 
     writer.write(element); 
    } 

    @Override 
    public Writer<T> duplicate() { 
     return new ParquetSinkWriter<>(schemaRepresentation); 
    } 

    private ParquetWriter<GenericRecord> createWriter() throws IOException { 
     if (schema == null) { 
      schema = new Schema.Parser().parse(schemaRepresentation); 
     } 

     return AvroParquetWriter.<GenericRecord>builder(path) 
      .withSchema(schema) 
      .withDataModel(new GenericData()) 
      .withCompressionCodec(compressionCodecName) 
      .withPageSize(pageSize) 
      .build(); 
    } 
} 
+0

引起:org.apache.hadoop.fs.FileAlreadyExistsException:/clicks-json/partitionkey=2018-01-12--16-30/_part-6-0.in-progress對於客戶端127.0.0.1已存在 \t at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2563) at org.apache.parquet.hadoop.ParquetWriter $ Builder.build(ParquetWriter.java:495) \t at org。 apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) \t at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) –