2016-10-28 66 views
0

我想解壓一些文件(其本身包含zipfiles)從谷歌存儲谷歌存儲。谷歌數據流:編碼爲ZipInputStream

因此,我有以下DOFN收集ZipInputStreams:

static class UnzipFilesFN extends DoFn<GcsPath,ZipInputStream>{ 

private static final long serialVersionUID = 7373250969860890761L; 
public void processElement(ProcessContext c){ 
    GcsPath p = c.element(); 
    try{ 
     ZipInputStream zis = new ZipInputStream(new FileInputStream(p.toString())); 
     c.output(zis); 

    } 
    catch (FileNotFoundException fnfe){ 
     // 
    } 
    } 

}

而下面的自定義接收做解壓和寫作部分:

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<ZipInputStream> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<ZipInputStream, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<ZipInputStream, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     private long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(ZipInputStream zis){ 
      byte[] buffer = new byte[1024]; 
      try{ 
       ZipEntry ze = zis.getNextEntry(); 
       while(ze!=null){ 
        File f = new File(writeOp.sink.unzipTarget + "/" + ze.getName()); 
        FileOutputStream fos = new FileOutputStream(f); 
        int len; 
        while((len=zis.read(buffer))>0){ 
         fos.write(buffer, 0, len); 
        } 
        fos.close(); 
        this.totalUnzipped++; 
       } 
       zis.closeEntry(); 
       zis.close(); 
      } 
      catch(Exception e){ 
       // 
      } 

     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    final long filesUnziped;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

當我嘗試運行管道時,出現一些錯誤:

從後備CoderProvider未能建立一個編碼器:無法爲類型java.util.zip.ZipInputStream中提供編碼器:[email protected]37無法提供類型爲java.util.zip.ZipInputStream的編碼器:無法提供ProtoCoder,因爲java.util.zip.ZipInputStream不是com.google.protobuf.Message的子類; [email protected]無法提供類型爲java.util.zip.ZipInputStream的編碼器:無法提供SerializableCoder,因爲java.util.zip.ZipInputStream沒有實現Serializable。 at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 在com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 在COM。 google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:332) 在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) 在com.google.cloud.dataflow。 sdk.values.PCollection.apply(PCollection.java:174)

哪個代碼我是否需要分配來處理ZipInputStreams?

感謝& BR 菲利普

回答

0

打碼機是必要的,這樣一個亞軍可以兌現的PCollection臨時存儲和讀回,而不是在內存中保存它。我想不出一個合理的方法來實現一個ZipInputStream對象 - 這是一個基本的概念問題,而不是一個Coder API問題。

然而,在特定情況下,我認爲你可以簡單地在你ZipWriter.write()功能打開ZipInputStream,使ZipIO.SinkSink<GcsPath>而非Sink<ZipInputStream>

我在代碼中注意到了另外一件事情:我想你打算將這些代碼用於位於GCS和Cloud Data Runner上的文件,而不僅僅是內存中的runner和本地文件。在這種情況下,java.io.File不會透明地處理對GCS的讀取/寫入 - 您需要使用GcsUtil

+0

嗨,我使用GcsPath代替ZipInputStream時得到相同的編碼器錯誤。謝謝&BR Philipp – bigdataclown

+0

哦。我的道歉,GcsPath沒有標記爲Serializable(儘管我認爲它應該是)。看起來你需要用'String'來表示路徑。 – jkff