對於非常簡單的管道,我們發現Dataflow SDK 1.9和2.0/2.1之間存在非常奇怪的差異。從數據流1.9到2.0/2.1的意外行爲更改
我們有CoGroupByKey步驟,通過它們的鍵連接兩個PCollections並輸出兩個PCollections(通過TupleTags)。例如,一個PCollection可能包含{「str1」,「str2」},另一個可能包含{「str3」}。
這兩個PCollections被寫入GCS(位於不同的位置),並且它們的聯合(基本上,通過在兩個PCollections上應用Flatten生成的PCollection)將被管道中的後續步驟使用。使用前面的例子,我們將在GCS中分別存儲{「str1」,「str2」}和{「str3」},並且管道將進一步轉換它們的聯合(Flattened PCollection){「str1」,「str2」 「str3」}等等。
在Dataflow SDK 1.9中,這正是發生的情況,並且我們圍繞此邏輯構建了我們的管道。 當我們慢慢遷移到2.0/2.1時,我們注意到這種行爲不再被觀察到。相反,Flatten步驟所遵循的所有步驟都可以正常運行,並且與預期的一樣,但這兩個PCollections(被拼合)不再被寫入GCS,就好像它們不存在一樣。但在執行圖中,顯示了這些步驟,這對我們來說很奇怪。
我們能夠可靠地重現此問題,以便我們可以共享數據和代碼作爲示例。 我們已經存儲在GCS兩個文本文件:
DATA1.TXT:
k1,v1
k2,v2
data2.txt:
k2,w2
k3,w3
我們會讀這兩個文件,以創建兩個PCollections,對於PC每個文件。 我們將解析每一行以創建KV<String, String>
(因此在此示例中鍵爲k1, k2, k3
)。
然後,我們應用CoGroupByKey並生成PCollection以輸出到GCS。 CoGroupByKey步驟之後將生成兩個PCollections,具體取決於與每個鍵關聯的值的數量(這是一個人爲的例子,但它是爲了演示我們遇到的問題) - 數字是偶數還是奇數。 因此,其中一臺PC將包含密鑰「k1」和「k3」(帶有一些附加值的字符串,請參閱下面的代碼),因爲它們每個都有一個值,另一個將包含單個密鑰「k2」有兩個值(在每個文件中找到)。
這兩臺PC被寫入GCS的不同位置,兩臺平板電腦也將被寫入GCS(但它可能已被進一步轉換)。
的三個輸出文件被認爲含有如下內容(行可能不是按順序):
OUTPUT1:
k2: [v2],(w2)
OUTPUT2:
k3: (w3)
k1: [v1]
outputMerged:
k3: (w3)
k2: [v2],(w2)
k1: [v1]
這正是我們在Dataflow SDK 1.9中看到的(和預期的)。
然而,在2.0和2.1中,output1和output2都是空的(並且甚至沒有執行TextIO步驟,就好像沒有元素被輸入到它們中一樣;我們通過添加一個虛擬ParDo來驗證這一點,它根本沒有被調用)。
這讓我們非常好奇,爲什麼突然之間這種行爲改變是在1.9和2.0/2.1之間進行的,以及我們如何實現我們一直在做的1.9的最佳方式。 具體而言,我們爲歸檔目的生成輸出1/2,而我們將兩臺PC壓平以進一步轉換數據並生成另一個輸出。
這是您可以運行的Java代碼(您必須正確導入,更改存儲桶名稱並正確設置選項等)。爲1.9
工作代碼:
//Dataflow SDK 1.9 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));
pipeline.run();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.sideOutput(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
工作規範2.0/2.1:
// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.output(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
此外,在情況下,它是有用的,執行圖形看起來像這樣。 (對於Google工程師,還指定了作業ID)。
1.9(作業ID 2017-09-29_14_35_42-15149127992051688457):
2.1(作業ID 2017-09-29_14_31_59-991964669451027883):
TextIO.Write 2,3在2.0/2.1下沒有產生任何輸出。拼合,其後續步驟工作正常。
嗨,感謝您將此引入我們的關注。我們已經在2.0+中轉載了這個問題,並且正在修復這個問題。 –
感謝您的評論!有沒有這方面的ETA?它阻止我們運行關鍵的大型管道(我們在過去的2年中每天都在運行),而我們看到的另一個選擇是使用Dataflow SDK 1.9中的舊代碼 - 但不推薦使用據我所知。任何其他提示/建議將非常感激。 –
我們正在儘快開展工作。我們也在研究解決方法,敬請期待。 –