我的管道將輸出數據文件存儲到GCS。 我想壓縮此文件。 TextIO解壓縮了壓縮的文件, ,但我猜它沒有壓縮文件。 如何壓縮輸出文件?如何在Dataflow Java SDK中壓縮輸出文件?
1
A
回答
1
這是目前開放的feature request的數據流,但是工作已經在光束進行。一旦Dataflow 2.0發佈(將基於Beam),這應該得到官方的支持。
也就是說,我已經能夠通過擴展FileBasedSink類和利用Jeff Payne在Beam中的這個特性上的工作來編寫壓縮的GZIP文件。
public class GZIPSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
GZIPSink(String baseOutputFilename, Coder<T> coder) {
super(baseOutputFilename, ".gz");
this.coder = coder;
}
@Override
public FileBasedWriteOperation createWriteOperation(PipelineOptions pipelineOptions) {
return new GZIPWriteOperation(this, coder);
}
static class GZIPWriteOperation<T> extends FileBasedSink.FileBasedWriteOperation<T> {
private final Coder<T> coder;
private GZIPWriteOperation(GZIPSink<T> sink, Coder<T> coder) {
super(sink);
this.coder = coder;
}
@Override
public FileBasedWriter createWriter(PipelineOptions pipelineOptions) throws Exception {
return new GZIPBasedWriter(this, coder);
}
}
static class GZIPBasedWriter<T> extends FileBasedSink.FileBasedWriter <T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private GZIPOutputStream out;
public GZIPBasedWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
}
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = new GZIPOutputStream(Channels.newOutputStream(channel), true) {{
def.setLevel(def.BEST_COMPRESSION);
}};
}
@Override
public void write(T value) throws Exception {
coder.encode(value, out, Coder.Context.OUTER);
out.write(NEWLINE);
}
@Override
public void writeFooter() throws IOException {
out.finish();
}
}
}
然後進行實際的寫:
aStringPCollection.apply(Write.to(new GZIPSink("gs://path/sharded-filename", StringUtf8Coder.of()));
1
TextIO
只支持讀取壓縮文件。它不支持使用壓縮編寫文件。
https://cloud.google.com/dataflow/model/text-io#reading-from-compressed-text-files
TEXTIO目前不支持寫入壓縮文件。
更多信息:
相關問題
- 1. 如何在Java中使用LZMA SDK進行壓縮/解壓縮
- 2. YUI壓縮機 - 壓縮多個輸入文件到一個輸出文件
- 3. 如何在Symfony2中壓縮html輸出?
- 4. 如何在PHP中壓縮html輸出?
- 5. 查找壓縮文件和輸出行
- 6. Parquet輸出文件不壓縮
- 7. Django壓縮空白Css輸出文件
- 8. 如何在LZMA SDK中開發增量壓縮/解壓縮?
- 9. 如何在iPhone中壓縮音頻文件sdk
- 10. 如何在iPhone SDK中壓縮文件夾?
- 11. 如何在Siebel中壓縮文件?
- 12. 如何壓縮文件時不壓縮
- 13. 如何列出Java中壓縮的tar文件的內容
- 14. 將大文件(> 5GB)輸出到Java中的.zip壓縮文件中
- 15. 如何使用7Z SDK壓縮和解壓文件
- 16. Java ZIP - 如何解壓縮文件夾?
- 17. 在java中壓縮和解壓縮7z文件
- 18. 在yii中壓縮/解壓縮文件
- 19. 解壓縮saz文件java
- 20. SPHINX輸出壓縮?
- 21. 輸出壓縮HTML
- 22. 在Java中壓縮文件的問題
- 23. 如何在Java中壓縮JSON和在Javascript中解壓縮
- 24. 通過Java壓縮文件
- 25. 使用g4壓縮壓縮輸出tiff
- 26. 檢測壓縮文件java
- 27. 如何在內存中解壓縮GZip壓縮文件?
- 28. 如何在asp.net框架4.0中壓縮/解壓縮文件夾
- 29. RAR壓縮文件與Java
- 30. 在clojure中壓縮文件
鏈接到這個獲取梁合併的PR,因爲我僅限於在我的人緣級後每2個鏈接。 https://github.com/apache/beam/commit/b7b68e6fb1aafb6b4160e5dcea022bf6c802e33f – Thang