在Google Cloud Dataflow中,我的連接失敗,並顯示「TupleTag Tag對應於非單例結果」。從錯誤堆棧看來,這種情況在CoGBKResults中的overide方法中發生。TupleTag Tag <taginfo>對應於非單例結果
String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);
以下是我的連接方法。
static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable,
PCollection<TableRow> AdTable) throws Exception {
final TupleTag<String> ImpressionInfoTag = new TupleTag<String>();
final TupleTag<String> AdInfoTag = new TupleTag<String>();
// transform both input collections to tuple collections, where the keys are Ad_ID
PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
ParDo.of(new ExtractImpressionDataInfoFn()));
PCollection<KV<String, String>> AdInfo = AdTable.apply(
ParDo.of(new ExtractAdDataInfoFn()));
// Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>)
PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
.of(ImpressionInfoTag, ImpressionInfo)
.and(AdInfoTag, AdInfo)
.apply(CoGroupByKey.<String>create());
// Process the CoGbkResult elements generated by the CoGroupByKey transform.
// Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply(ParDo.named("Process").of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);
for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) {
// Generate a string that combines information from both collection values
c.output(KV.of(Ad_ID, " " + Ad_Info
+ " " + eventInfo));
}
}
}));
//write to GCS
PCollection<String> formattedResults = finalResultCollection
.apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
@Override
public void processElement(ProcessContext c) {
String outputstring = "AdUnitID: " + c.element().getKey()
+ ", " + c.element().getValue();
c.output(outputstring);
}
}));
return formattedResults;
}
我的ExtractImpressionDataInfoFn類和ExtractAdDatInfoFn類如下所示。
static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String User_ID = (String) row.get("UserID");
String Client_ID = (String) row.get("ClientID");
String Impr_Time = (String) row.get("GfpActivityAdEventTIme");
String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time;
c.output(KV.of(Ad_ID, ImprInfo));
}
}
static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String Content_ID = (String) row.get("ContentID");
String Pub_ID = (String) row.get("Publisher");
String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID;
c.output(KV.of(Ad_ID, Add_Info));
}
}
架構的印象和廣告都低於 印象: adUnitId設置 用戶名 客戶端ID
GfpActivityAdEventTIme
廣告: adUnitId設置 客戶端ID 出版商
https://github.com/kosalan/gcppoc/tree/master/src/main/java/com/gcp/poc 滿級在這裏可以查看 – KosiB
您可以在這個問題完整的錯誤堆棧? –
您是否有Dataflow的工作ID? –