2015-01-02 45 views
1

我正在查看documentationprovided examples,瞭解如何在處理Google的dataflow service數據時報告無效數據。如何在使用Google數據流處理數據時報告無效數據?

Pipeline p = Pipeline.create(options); 
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) 
.apply(new SomeTransformation()) 
.apply(TextIO.Write.named("WriteMyFile").to(options.getOutput())); 
p.run(); 

除了實際輸入/輸出,我想產生包含被認爲無效的記錄一個第二輸出文件(例如丟失的數據,畸形數據,值太高)。我想對這些記錄進行疑難解答並分別處理它們。

  • 輸入:GS://.../input.csv
  • 輸出:GS:GS://.../output.csv的無效記錄
  • 清單// ... /invalid.csv

如何將這些無效記錄重定向到單獨的輸出中?

回答

3

您可以使用PCollectionTuples從單個轉換中返回多個PCollections。例如,

TupleTag<String> mainOutput = new TupleTag<>("main"); 
TupleTag<String> missingData = new TupleTag<>("missing"); 
TupleTag<String> badValues = new TupleTag<>("bad"); 

Pipeline p = Pipeline.create(options); 
PCollectionTuple all = p 
    .apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) 
    .apply(new SomeTransformation()); 

all.get(mainOutput) 
    .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput())); 
all.get(missingData) 
    .apply(TextIO.Write.named("WriteMissingData").to(...)); 
... 

PCollectionTuples可以直接建立從現有PCollections的,或者具有側輸出從帕爾操作發射的,例如

PCollectionTuple partitioned = input.apply(ParDo 
    .of(new DoFn<String, String>() { 
      public void processElement(ProcessContext c) { 
      if (checkOK(c.element()) { 
       // Shows up in partitioned.get(mainOutput). 
       c.output(...); 
      } else if (hasMissingData(c.element())) { 
       // Shows up in partitioned.get(missingData). 
       c.sideOutput(missingData, c.element()); 
      } else { 
       // Shows up in partitioned.get(badValues). 
       c.sideOutput(badValues, c.element()); 
      } 
      } 
     }) 
    .withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues))); 

注意,在一般的各種側輸出不需要具有相同的類型,並且數據可以被髮射的任何次數,以任何數量的側輸出的(而不是嚴格分區,我們在這裏)。然後

你SomeTransformation類可能看起來像

class SomeTransformation extends PTransform<PCollection<String>, 
              PCollectionTuple> { 
    public PCollectionTuple apply(PCollection<String> input) { 
    // Filter into good and bad data. 
    PCollectionTuple partitioned = ... 
    // Process the good data. 
    PCollection<String> processed = 
     partitioned.get(mainOutput) 
        .apply(...) 
        .apply(...) 
        ...; 
    // Repackage everything into a new output tuple. 
    return PCollectionTuple.of(mainOutput, processed) 
          .and(missingData, partitioned.get(missingData)) 
          .and(badValues, partitioned.get(badValues)); 
    } 
} 
0

羅伯特的使用sideOutputs的建議是偉大的,但要注意,如果壞數據是由您ParDos確定這隻會工作。目前還沒有一種方法可以在初始解碼過程中識別錯誤記錄(Coder.decode中發生錯誤)。我們有計劃儘快開展工作。