我已經爲BucketingSink創建了一個編寫器。匯和作家工作沒有錯誤,但當涉及到寫作avro通用錄製到實木複合地板的作家,該文件是從進行中創建,待完成。但是這些文件是空的,有0個字節。任何人都可以告訴我代碼有什麼問題嗎?我試過在open()方法中放置AvroParquetWriter的初始化,但結果仍然相同。Flink BucketingSink與自定義AvroParquetWriter創建空文件
當調試代碼,我確認writer.write(元件)不執行與元件包含阿夫羅genericrecord數據
流數據
BucketingSink<DataEventRecord> sink =
new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");
sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());
ParquetSinkWriter
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;
public class ParquetSinkWriter<T> extends StreamWriterBase<T> {
private transient ParquetWriter<GenericRecord> writer;
private Path path;
private FileSystem fs;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int blockSize = 256 * 1024 * 1024;
private final int pageSize = 64 * 1024;
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.path = path;
this.fs = fs;
}
@Override
public void write(T event) throws IOException {
DataEventRecord element = (DataEventRecord) event;
if (writer == null) {
writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
}
if (writer != null) {
GenericRecord datum = element.getRecord();
writer.write(datum);
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<T>();
}
}
我設法解決了這個問題。在寫入過程中,在調用super.open(fs,path)的同時創建AvroParquetWRiter實例時出現問題。打開的事件已經創建了一個文件,作者也試圖創建相同的文件,但無法執行,因爲文件已經存在。因此,當Avro編寫器無法寫入已存在的文件時,總會有0條記錄寫入文件。刪除super.open將導致基類因「Writer未打開」而失敗。我最終基於BucketingSink擴展自己的接收器類,現在一切正常。 – jlim
請問您如何解決它,請顯示一些參考代碼?我也堅持同樣的問題 – neoeahit
你不能簡單地實現'Writer'接口而不是使用'StreamWriterBase'嗎? 'StreamWriterBase'打開一個你不需要的文件'FSDataOutputStream'。 –