3

在Google Cloud Dataflow中,我的連接失敗,並顯示「TupleTag Tag對應於非單例結果」。從錯誤堆棧看來,這種情況在CoGBKResults中的overide方法中發生。TupleTag Tag <taginfo>對應於非單例結果

String Ad_ID = e.getKey(); 
String Ad_Info = "none"; 
Ad_Info = e.getValue().getOnly(AdInfoTag); 

以下是我的連接方法。

static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable, 
     PCollection<TableRow> AdTable) throws Exception { 

    final TupleTag<String> ImpressionInfoTag = new TupleTag<String>(); 
    final TupleTag<String> AdInfoTag = new TupleTag<String>(); 

    // transform both input collections to tuple collections, where the keys are Ad_ID 
    PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
     ParDo.of(new ExtractImpressionDataInfoFn())); 
    PCollection<KV<String, String>> AdInfo = AdTable.apply(
     ParDo.of(new ExtractAdDataInfoFn())); 

    // Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>) 
    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple 
     .of(ImpressionInfoTag, ImpressionInfo) 
     .and(AdInfoTag, AdInfo) 
     .apply(CoGroupByKey.<String>create()); 

    // Process the CoGbkResult elements generated by the CoGroupByKey transform. 
    // Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo> 
    PCollection<KV<String, String>> finalResultCollection = 
     kvpCollection.apply(ParDo.named("Process").of(
     new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { 
      private static final long serialVersionUID = 1L; 

     @Override 
      public void processElement(ProcessContext c) { 
      KV<String, CoGbkResult> e = c.element(); 
      String Ad_ID = e.getKey(); 
      String Ad_Info = "none"; 
      Ad_Info = e.getValue().getOnly(AdInfoTag); 
      for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
       // Generate a string that combines information from both collection values 
       c.output(KV.of(Ad_ID, " " + Ad_Info 
         + " " + eventInfo)); 
      } 
      } 
     })); 

    //write to GCS 
    PCollection<String> formattedResults = finalResultCollection 
     .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { 
      @Override 
      public void processElement(ProcessContext c) { 
      String outputstring = "AdUnitID: " + c.element().getKey() 
       + ", " + c.element().getValue(); 
      c.output(outputstring); 
      } 
     })); 
    return formattedResults; 
    } 

我的ExtractImpressionDataInfoFn類和ExtractAdDatInfoFn類如下所示。

static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void processElement(ProcessContext c) { 
     TableRow row = c.element(); 
     String Ad_ID = (String) row.get("AdUnitID"); 
     String User_ID = (String) row.get("UserID"); 
     String Client_ID = (String) row.get("ClientID"); 
     String Impr_Time = (String) row.get("GfpActivityAdEventTIme"); 
     String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time; 
     c.output(KV.of(Ad_ID, ImprInfo)); 
    } 
} 


static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> { 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void processElement(ProcessContext c) { 
     TableRow row = c.element(); 
     String Ad_ID = (String) row.get("AdUnitID"); 
     String Content_ID = (String) row.get("ContentID"); 
     String Pub_ID = (String) row.get("Publisher"); 
     String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID; 
     c.output(KV.of(Ad_ID, Add_Info)); 
    } 
} 

架構的印象和廣告都低於 印象: adUnitId設置 用戶名 客戶端ID
GfpActivityAdEventTIme

廣告: adUnitId設置 客戶端ID 出版商

enter image description here

+0

https://github.com/kosalan/gcppoc/tree/master/src/main/java/com/gcp/poc 滿級在這裏可以查看 – KosiB

+0

您可以在這個問題完整的錯誤堆棧? –

+0

您是否有Dataflow的工作ID? –

回答

1

該錯誤表明,當您致電getOnly時,CoGroupByKey有多個結果。特別是這條線:

Ad_Info = e.getValue().getOnly(AdInfoTag); 

如果你改變它爲getAll(AdInfoTag)它應該工作。

+0

這作品。非常感謝 – KosiB