2016-10-04 26 views
1

我一直在努力處理需要發出端輸出的作業,因爲我不斷收到異常('無法序列化xxx')。無法使用OutputTags運行作業

即使我明確地指定了,我有工作,我一直得到同樣的錯誤類型編碼器,所以我決定寫一個簡單的工作本文檔以下內容:

https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs

令我驚訝,我仍然得到同樣的例外,現在我懷疑我一定犯了錯誤(但我自己無法弄清楚)。就代碼而言,我試圖按照上面給出的例子。

下面,我張貼源代碼以及我運行它時得到的錯誤消息。我相信這是可重複的(將'GCS_BUCKET'更改爲您擁有的任何存儲桶,並創建使用args調用'TestSideOutput'的main()方法),並且知道其他人是否可以重現它們將是很好的。 我們正在使用JDK 8和Dataflow SDK 1.7.0。

請注意,上面的文檔中的示例使用了一個匿名類來擴展DoFn,我也試過但得到了同樣的錯誤信息;下面的代碼將此類重構爲一個命名的內部類('Filter')。

我也嘗試初始化沒有花括號(「{}」)的TupleTags - 因爲這實際上會產生警告 - 這會導致異常(請參閱本文中的最後一個代碼片段)。

這裏是我使用的代碼:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.values.PCollectionTuple; 
import com.google.cloud.dataflow.sdk.values.TupleTag; 
import com.google.cloud.dataflow.sdk.values.TupleTagList; 
import com.moloco.dataflow.DataflowConstants; 

public class TestSideOutput { 
    private TestOptions options; 
    private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name 

    public TestSideOutput(String[] args) { 
    options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class); 
    options.setProject(DataflowConstants.PROJCET_NAME); 
    options.setStagingLocation(DataflowConstants.STAGING_BUCKET); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setJobName(options.getJob() + "-test-sideoutput"); 
    } 

    public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>() {}; 
    final TupleTag<String> sideTag = new TupleTag<String>() {}; 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = 
     profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.apply(
     TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.apply(
     TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

    static class Filter extends DoFn<String, String> { 
    private static final long serialVersionUID = 0; 
    final TupleTag<String> sideTag; 
    String keyword; 

    public Filter(String keyword, TupleTag<String> sideTag) { 
     this.sideTag = sideTag; 
     this.keyword = keyword; 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     String profile = c.element(); 
     if (profile.contains(keyword)) { 
     c.output(profile); 
     } else { 
     c.sideOutput(sideTag, profile); 
     } 
    } 
    } 
} 

這是我用過的命令和錯誤/例外,我得到了(它只是包含了我們使用我們的數據流包幾個命令行參數,這裏沒有什麼特別,只是給你一個想法):對於「TestOptions」類

dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01 
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41) 
     at com.moloco.dataflow.Main.main(Main.java:152) 
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) 
     ... 6 more 

另外,我不認爲這是相關的,但代碼:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.Description; 
import com.google.cloud.dataflow.sdk.options.Validation; 

public interface TestOptions extends DataflowPipelineOptions { 
    @Description("Job") 
    @Validation.Required 
    String getJob(); 

    void setJob(String value); 

    @Description("Job suffix") 
    String getJobSuffix(); 

    void setJobSuffix(String value); 

    @Description("Date") 
    @Validation.Required 
    String getDate(); 

    void setDate(String value); 
} 

最後,如果我要在實例化TupleTags時刪除大括號「{}」,我會得到以下異常(而我發現Stackoverflow上的建議我應該立即使用「{}」來避免這種問題):

Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes: 
    No Coder has been manually specified; you may do so using .setCoder(). 
    Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible. 
    Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) 
     at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
     at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50) 
     at com.moloco.dataflow.Main.main(Main.java:152) 

編輯:參見下面由製造執行() '靜態' 解決這個問題的答案。

下面的代碼與我最初發布的代碼相似,但有兩處更改: 只要有可能,我會爲每個PCollection再次明確指定(和冗餘)「編碼器」。另外,在實例化TupleTags時,不需要大括號。

注意確定哪種方法(靜態vs這種冗餘方法)更合適。

public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>(); 
    final TupleTag<String> sideTag = new TupleTag<String>(); 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of()) 
     .apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain") 
     .to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide") 
     .to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

回答

2

你得到的錯誤是因爲你的Filter FN引用TupleTag,這反過來又(因爲它是從一個非靜態函數​​實例化的非靜態匿名類)引用封閉TestSideOutput

所以管道試圖序列化TestSideOutput對象,它不可序列化 - 如消息:java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput所證明。

根本原因在於方法​​不是靜態的。使其成爲靜態應該可以解決問題。

+0

確實,喲建議解決我遇到的問題。謝謝! 另一方面,我們有另一個工作,它有一個非靜態的execute()方法,我們將ParDo應用於side output標籤,並且它不會拋出異常(這也是我爲什麼寫了上面的示例代碼的一部分,因爲它對我來說似乎很奇怪)。我現在不能真的發佈這些代碼,但我想知道是否有另一種方法可以解決這個問題,而不會使execute()方法變爲靜態的? –

+0

我有點回答我的後續問題(請參閱我編輯的問題結尾添加的代碼片段)。看起來有可能通過在任何可能的情況下明確和冗餘地聲明編碼器來將execute()保持爲非靜態的。 –