2016-12-28 25 views

回答

1

這是目前開放的feature request的數據流,但是工作已經在光束進行。一旦Dataflow 2.0發佈(將基於Beam),這應該得到官方的支持。

也就是說,我已經能夠通過擴展FileBasedSink類和利用Jeff Payne在Beam中的這個特性上的工作來編寫壓縮的GZIP文件。

public class GZIPSink<T> extends FileBasedSink<T> { 
    private final Coder<T> coder; 

    GZIPSink(String baseOutputFilename, Coder<T> coder) { 
     super(baseOutputFilename, ".gz"); 
     this.coder = coder; 
    } 

    @Override 
    public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) { 
     return new GZIPWriteOperation(this, coder); 
    } 

    static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> { 
     private final Coder<T> coder; 

     private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) { 
      super(sink); 
      this.coder = coder; 
     } 

     @Override 
     public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception { 
      return new GZIPBasedWriter(this, coder); 
     } 
    } 

    static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> { 
     private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); 
     private final Coder<T> coder; 
     private GZIPOutputStream out; 

     public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { 
      super(writeOperation); 
      this.mimeType = MimeTypes.BINARY; 
      this.coder = coder; 
     } 

     @Override 
     protected void prepareWrite(WritableByteChannel channel) throws Exception { 
      out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{ 
       def.setLevel(def.BEST_COMPRESSION); 
      }}; 
     } 

     @Override 
     public void write(T value) throws Exception { 
      coder.encode(value, out, Coder.Context.OUTER); 
      out.write(NEWLINE); 
     } 

     @Override 
     public void writeFooter() throws IOException { 
      out.finish(); 
     } 
    } 
}  

然後進行實際的寫:

aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));

+0

鏈接到這個獲取梁合併的PR,因爲我僅限於在我的人緣級後每2個鏈接。 https://github.com/apache/beam/commit/b7b68e6fb1aafb6b4160e5dcea022bf6c802e33f – Thang