2017-09-05 103 views
0

我有一段代碼可以提取Google雲端存儲中.ZIP文件的內容。它工作正常,但我需要使用此代碼與將在運行時提供的文件路徑(「gs://some_bucket/filename.zip」)。當我嘗試使用運行值,我得到一個錯誤,如:使用ValueProvider作爲Apache Beam中的路徑提取zip內容

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) 
    at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83) 
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94) 
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89) 
    at org.apache.beam.sdk.io.Read.from(Read.java:48) 
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535) 
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292) 
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) 
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) 
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) 
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164) 
    at BeamTest2.StarterPipeline.main(StarterPipeline.java:180) 
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject(Unknown Source) 
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) 
    ... 11 more 

的代碼,我使用的是:

//Unzip incoming file 
     PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
     NestedValueProvider.of(
      options.getInputFile(), 
      new SerializableFunction<String, String>() { 
      private static final long serialVersionUID = 1L; 
      @Override 
      public String apply(String filepath) { 
       try{ 

       List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath)); 
       LOG.info(gcsPaths+"FilesUnzipped"); 
        List<String> paths = new ArrayList<String>(); 

        for(GcsPath gcsp: gcsPaths){ 
         paths.add(gcsp.toString()); 
        } 
        p.apply(Create.of(paths)) 
         .apply(ParDo.of(new UnzipFN(filepath))); 

       } 
       catch(Exception e) 
       { 
        LOG.info("Exception caught while extracting ZIP"); 
       } 
       return ""; 
      } 
      })).usingStandardSql().withoutValidation()); 

UnzipFN類:

public class UnzipFN extends DoFn<String,Long>{ 
    private long filesUnzipped=0; 
    @ProcessElement 
    public void processElement(ProcessContext c){ 
     String p = c.element(); 
     GcsUtilFactory factory = new GcsUtilFactory(); 
     GcsUtil u = factory.create(c.getPipelineOptions()); 
     byte[] buffer = new byte[100000000]; 
     try{ 
      SeekableByteChannel sek = u.open(GcsPath.fromUri(p)); 
      InputStream is = Channels.newInputStream(sek); 
      BufferedInputStream bis = new BufferedInputStream(is); 
      ZipInputStream zis = new ZipInputStream(bis); 
      ZipEntry ze = zis.getNextEntry(); 
      while(ze!=null){ 
       LOG.info("Unzipping File {}",ze.getName()); 
       WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName())); 
       OutputStream os = Channels.newOutputStream(wri); 
       int len; 
       while((len=zis.read(buffer))>0){ 
        os.write(buffer,0,len); 
       } 
       os.close(); 
       filesUnzipped++; 
       ze=zis.getNextEntry(); 


      } 
      zis.closeEntry(); 
      zis.close(); 

     } 
     catch(Exception e){ 
      e.printStackTrace(); 
     } 
    c.output(filesUnzipped); 
    System.out.println(filesUnzipped+"FilesUnzipped"); 
    LOG.info("FilesUnzipped"); 
    } 

    private String getType(String fName){ 
     if(fName.endsWith(".zip")){ 
      return "application/x-zip-compressed"; 
     } 
     else { 
      return "text/plain"; 
     } 
    } 
} 

如何處理這場景?

P.S. - .zip提取代碼與BigQueryIO.read()無關。我只是用它作爲黑客來讀取運行時值。如果您有任何其他建議,請讓我知道。

謝謝。

+0

NestedValueProvider中的SerializableFunction總是返回空字符串「」 - 這是故意的嗎?而應用UnzipFn產生的集合也被忽略。 – jkff

+0

另外它看起來像你試圖添加新的圖形步驟到你的NestedValueProvider的SerializableFunction內的管道。這是不可能的:管道首先被構建然後執行:你不能在運行時添加新的步驟。我很困惑你想做什麼,所以我不確定如何幫助你做到這一點 - 請澄清你想要做的事情。 – jkff

+0

@jkff是的,這是故意的。所以基本上沒有UnzipFN產生的收集。 UnzipFN的工作只是解壓縮並提取其路徑將在運行時提供的.zip文件的內容。所以我的意思是要問 - 如何解壓縮GCS位置在運行時提供的文件? 如果除了我正在做的事情之外還有其他方式,請告訴我。 – rish0097

回答

1

如果我理解正確的,你有一個ValueProvider<String>包含filepattern,你正在擴大使用GcsUtil.expand()的filepattern,你想給一個函數(UnzipFn)適用於每個所產生的文件名。

目前的代碼不會有幾個原因的工作:

  • 你正在創建一個BigQueryIO.read().fromQuery()其中fromQuery()參數是ValueProvider總是返回空字符串(您NestedValueProvider,做了一堆東西后,總是返回空字符串"")。這會在運行時失敗,因爲查詢不能爲空。使用BigQueryIO作爲黑客試圖訪問ValueProvider不是一個好主意 - 請參閱下文。
  • 您正在將函數中的步驟添加到函數中,以便從ValueProvider中提取值。該函數在管道正在運行時從worker中調用,以獲取提供者的運行時值。在管道運行時,不可能從工作人員向管道添加步驟。
  • 你捕捉Pipeline對象爲SerializableFunction關閉,並且它不能序列化,因爲Pipeline不是Serializable - 因爲沒有合法的使用情況序列化Pipeline Java對象:它永遠不會需要運到工人或對於跑步者來說,它只是一個臨時構建器對象,用於在您的主程序中構建一些東西,您可以稍後調用.run()。另一方面,SerializableFunction運到工人,以便他們可以評估ValueProvider的當前值。

ValueProvider想象爲僅在流水線運行時才具有值的佔位符,而不是構建時的值 - 例如,你可以從DoFn內撥打provider.get()NestedValueProvider完全不會改變它 - 它只是簡單地包裝另一個ValueProvider,通常使用一些簡單的轉換邏輯,並且當您有ValueProvider<Something>但需要它作爲ValueProvider<SomethingSlightlyDifferent>時,它將用作膠水代碼。

的問題的關鍵是,你想只在運行時使用可用的值(你options.getInputFile()ValueProvider)做一些建設時 - 創建管道一步Create.of(paths)。在邏輯上不可能規避建設時ValueProvider的不可用性:ValueProvider專門用於表示在建造時尚未提供的值,因此它們作爲佔位符保留在管線描述中,並作爲參數提供只有在管道運行時。你需要想出一個管道結構,其中輸入文件是一個佔位符,管道以你想要的方式處理它。

你可以這樣說:

p.apply(Create.ofProvider(options.getInputFile(), StringUtf8Coder.of())) 
.apply(ParDo.of(new ExpandFn())) 
.apply(...fusion break...) 
.apply(ParDo.of(new UnzipFn())) 

其中ExpandFn將是一個DoFn,需要一個String並做你的GcsUtil.expand()的東西,並融合斷見,例如執行JdbcIO.java

在Beam 2.2中(你可以在HEAD當前使用它),你不需要ExpandFn - 已經存在一個可以擴展文件模式等等的變換(例如,它可以遞增地擴展文件模式並且繼續觀察新文件匹配它,在一個流媒體管道中)。所以你可以寫得更簡潔:

p.apply(FileIO.match().filepattern(options.getInputFile())) 
.apply(...fusion break...) 
.apply(ParDo.of(new UnzipFn())); 
+0

謝謝@jkff它的工作。我甚至沒有進行融合突破。:) – rish0097

+0

很高興它的工作,但請注意,沒有融合突破,這段代碼很可能會使用單線程並且沒有並行性。 – jkff