2017-09-29 22 views
0

對於非常簡單的管道,我們發現Dataflow SDK 1.9和2.0/2.1之間存在非常奇怪的差異。從數據流1.9到2.0/2.1的意外行爲更改

我們有CoGroupByKey步驟,通過它們的鍵連接兩個PCollections並輸出兩個PCollections(通過TupleTags)。例如,一個PCollection可能包含{「str1」,「str2」},另一個可能包含{「str3」}。

這兩個PCollections被寫入GCS(位於不同的位置),並且它們的聯合(基本上,通過在兩個PCollections上應用Flatten生成的PCollection)將被管道中的後續步驟使用。使用前面的例子,我們將在GCS中分別存儲{「str1」,「str2」}和{「str3」},並且管道將進一步轉換它們的聯合(Flattened PCollection){「str1」,「str2」 「str3」}等等。

在Dataflow SDK 1.9中,這正是發生的情況,並且我們圍繞此邏輯構建了我們的管道。 當我們慢慢遷移到2.0/2.1時,我們注意到這種行爲不再被觀察到。相反,Flatten步驟所遵循的所有步驟都可以正常運行,並且與預期的一樣,但這兩個PCollections(被拼合)不再被寫入GCS,就好像它們不存在一樣。但在執行圖中,顯示了這些步驟,這對我們來說很奇怪。

我們能夠可靠地重現此問題,以便我們可以共享數據和代碼作爲示例。 我們已經存儲在GCS兩個文本文件:

DATA1.TXT:

k1,v1 
k2,v2 

data2.txt:

k2,w2 
k3,w3 

我們會讀這兩個文件,以創建兩個PCollections,對於PC每個文件。 我們將解析每一行以創建KV<String, String>(因此在此示例中鍵爲k1, k2, k3)。

然後,我們應用CoGroupByKey並生成PCollection以輸出到GCS。 CoGroupByKey步驟之後將生成兩個PCollections,具體取決於與每個鍵關聯的值的數量(這是一個人爲的例子,但它是爲了演示我們遇到的問題) - 數字是偶數還是奇數。 因此,其中一臺PC將包含密鑰「k1」和「k3」(帶有一些附加值的字符串,請參閱下面的代碼),因爲它們每個都有一個值,另一個將包含單個密鑰「k2」有兩個值(在每個文件中找到)。

這兩臺PC被寫入GCS的不同位置,兩臺平板電腦也將被寫入GCS(但它可能已被進一步轉換)。

的三個輸出文件被認爲含有如下內容(行可能不是按順序):

OUTPUT1:

k2: [v2],(w2) 

OUTPUT2:

k3: (w3) 
k1: [v1] 

outputMerged:

k3: (w3) 
k2: [v2],(w2) 
k1: [v1] 

這正是我們在Dataflow SDK 1.9中看到的(和預期的)。

然而,在2.0和2.1中,output1和output2都是空的(並且甚至沒有執行TextIO步驟,就好像沒有元素被輸入到它們中一樣;我們通過添加一個虛擬ParDo來驗證這一點,它根本沒有被調用)。

這讓我們非常好奇,爲什麼突然之間這種行爲改變是在1.9和2.0/2.1之間進行的,以及我們如何實現我們一直在做的1.9的最佳方式。 具體而言,我們爲歸檔目的生成輸出1/2,而我們將兩臺PC壓平以進一步轉換數據並生成另一個輸出。

這是您可以運行的Java代碼(您必須正確導入,更改存儲桶名稱並正確設置選項等)。爲1.9

工作代碼:

//Dataflow SDK 1.9 compatible. 
public class TestJob { 
    public static void execute(Options options) { 
    Pipeline pipeline = Pipeline.create(options); 
    PCollection<KV<String, String>> data1 = 
     pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn())); 

    PCollection<KV<String, String>> data2 = 
     pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn())); 

    TupleTag<String> inputTag1 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 
    TupleTag<String> inputTag2 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 

    TupleTag<String> outputTag1 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 
    TupleTag<String> outputTag2 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2) 
     .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2)) 
      .withOutputTags(outputTag1, TupleTagList.of(outputTag2))); 
    PCollection<String> output1 = tuple.get(outputTag1); 
    PCollection<String> output2 = tuple.get(outputTag2); 
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections()); 

    outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1)); 
    output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1)); 
    output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1)); 

    pipeline.run(); 
    } 

    static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> { 
    private static final long serialVersionUID = 1L; 

    final TupleTag<String> inputTag1; 
    final TupleTag<String> inputTag2; 
    final TupleTag<String> outputTag2; 

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) { 
     this.inputTag1 = inputTag1; 
     this.inputTag2 = inputTag2; 
     this.outputTag2 = outputTag2; 
    } 

    @ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     String key = c.element().getKey(); 
     List<String> values = new ArrayList<String>(); 
     int numValues = 0; 
     for (String val1 : c.element().getValue().getAll(inputTag1)) { 
     values.add(String.format("[%s]", val1)); 
     numValues++; 
     } 
     for (String val2 : c.element().getValue().getAll(inputTag2)) { 
     values.add(String.format("(%s)", val2)); 
     numValues++; 
     } 
     final String line = String.format("%s: %s", key, Joiner.on(",").join(values)); 
     if (numValues % 2 == 0) { 
     c.output(line); 
     } else { 
     c.sideOutput(outputTag2, line); 
     } 
    } 
    } 

    static class doFn extends DoFn<String, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     String[] tokens = c.element().split(","); 
     c.output(KV.of(tokens[0], tokens[1])); 
    } 
    } 
} 

工作規範2.0/2.1:

// Dataflow SDK 2.0 and 2.1 compatible. 
public class TestJob { 
    public static void execute(Options options) { 
    Pipeline pipeline = Pipeline.create(options); 
    PCollection<KV<String, String>> data1 = 
     pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn())); 

    PCollection<KV<String, String>> data2 = 
     pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn())); 

    TupleTag<String> inputTag1 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 
    TupleTag<String> inputTag2 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 

    TupleTag<String> outputTag1 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 
    TupleTag<String> outputTag2 = new TupleTag<String>() { 
     private static final long serialVersionUID = 1L; 
    }; 

    PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2) 
     .apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2)) 
      .withOutputTags(outputTag1, TupleTagList.of(outputTag2))); 
    PCollection<String> output1 = tuple.get(outputTag1); 
    PCollection<String> output2 = tuple.get(outputTag2); 
    PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections()); 

    outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1)); 
    output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1)); 
    output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1)); 

    PipelineResult pipelineResult = pipeline.run(); 
    pipelineResult.waitUntilFinish(); 

    } 

    static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> { 
    private static final long serialVersionUID = 1L; 

    final TupleTag<String> inputTag1; 
    final TupleTag<String> inputTag2; 
    final TupleTag<String> outputTag2; 

    public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) { 
     this.inputTag1 = inputTag1; 
     this.inputTag2 = inputTag2; 
     this.outputTag2 = outputTag2; 
    } 

    @ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     String key = c.element().getKey(); 
     List<String> values = new ArrayList<String>(); 
     int numValues = 0; 
     for (String val1 : c.element().getValue().getAll(inputTag1)) { 
     values.add(String.format("[%s]", val1)); 
     numValues++; 
     } 
     for (String val2 : c.element().getValue().getAll(inputTag2)) { 
     values.add(String.format("(%s)", val2)); 
     numValues++; 
     } 
     final String line = String.format("%s: %s", key, Joiner.on(",").join(values)); 
     if (numValues % 2 == 0) { 
     c.output(line); 
     } else { 
     c.output(outputTag2, line); 
     } 
    } 
    } 

    static class doFn extends DoFn<String, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
     String[] tokens = c.element().split(","); 
     c.output(KV.of(tokens[0], tokens[1])); 
    } 
    } 
} 

此外,在情況下,它是有用的,執行圖形看起來像這樣。 (對於Google工程師,還指定了作業ID)。

1.9(作業ID 2017-09-29_14_35_42-15149127992051688457): enter image description here

2.1(作業ID 2017-09-29_14_31_59-991964669451027883): enter image description here

TextIO.Write 2,3在2.0/2.1下沒有產生任何輸出。拼合,其後續步驟工作正常。

+1

嗨,感謝您將此引入我們的關注。我們已經在2.0+中轉載了這個問題,並且正在修復這個問題。 –

+0

感謝您的評論!有沒有這方面的ETA?它阻止我們運行關鍵的大型管道(我們在過去的2年中每天都在運行),而我們看到的另一個選擇是使用Dataflow SDK 1.9中的舊代碼 - 但不推薦使用據我所知。任何其他提示/建議將非常感激。 –

+0

我們正在儘快開展工作。我們也在研究解決方法,敬請期待。 –

回答

3

這確實是一個缺陷。修復程序正在運行,應該記錄在Service Release Notes中。

與此同時的解決方法是使用1.9.1 SDK,因爲此錯誤僅影響2.x SDK。

有興趣提前修復問題的用戶也可以使用Beam的最新每晚構建(建議解除開發,而不是生產,因爲它是每日構建)。 Instructions here

+0

該修補程序在生產中已完全推出。 –