2017-09-24 63 views
-1

我正在使用谷歌數據流CoGbkResult連接兩個表作爲內部連接。Google Dataflow內部連接加入列表[]

我能夠成功加入表格。 我正在寫輸出到一個文本文件,並能夠驗證連接。但是,連接會將匹配結果放入列表中。

就是這樣。

301%103%203%2017-09-20 07:49:46[2%google, 3%google, 1%microsoft] 
301%105%200%2017-09-17 11:48:59[2%google, 3%google, 1%microsoft] 

301%103%203%2017-09-20 07:49:46來自table_1。 2%google,3%google,1%microsoft與在table_2中加入的結果匹配。

以下是我processElement方法:

public void processElement(ProcessContext c) { 
    KV<String, CoGbkResult> e = c.element(); 
    String Ad_ID = e.getKey(); 
    Iterable<String> Ad_Info = null; 
    Ad_Info = e.getValue().getAll(AdInfoTag); 
    for (String ImpressionInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
    // Generate a string that combines information from both collection values 
    c.output(KV.of(Ad_ID, "%" + ImpressionInfo + Ad_Info)); 
    } 
} 

我不知道我怎樣才能在單行輸出。例如:

301%103%203%2017-09-20 07:49:46 2%google 
01%103%203%2017-09-20 07:49:46 3%google 
01%103%203%2017-09-20 07:49:46 1%microsoft 
301%105%200%2017-09-17 11:48:59 2%google 1%microsoft 
301%105%200%2017-09-17 11:48:59 3%google 
301%105%200%2017-09-17 11:48:59 1%microsoft 
+0

這並不完全清楚你想如何格式化輸出。具體來說,在您的示例中有3個不同的行,前綴爲「301%105%200%2017-09-17 11:48:59」,其中一行包含「2%谷歌」和「1%微軟」在線上。那是故意的嗎? –

+0

@Ben Chambers ...這是工作,當我做單獨解析。問題是客戶我切換到toString – KosiB

回答

0

我設法通過解析器來解決這個問題。 GCP數據流還爲此提供了一種方法嗎?

int jointbegin = outputstring.indexOf(「[」); String firsthalf = outputstring.substring(0,jointbegin); String secondhalf = outputstring.substring(outputstring.indexOf(「[」)+ 1,outputstring.indexOf(「]」));

  if (!secondhalf.isEmpty()) 
      { 
       String[] ad_data = secondhalf.split(","); 

       for (int i = 0; i < ad_data.length; i++) 
       { 
        String final_string = firsthalf + ad_data[i]; 
        c.output(final_string); 
       } 
      } 
      } 
+0

在你的問題的DoFn中,你(隱式地)在可迭代的Ad_Info上調用toString(),現在你解析它以提取單個組件 - 爲什麼不只是迭代AdInfo原來的DoFn,其中已包含組件? – jkff

+0

@jkff。這正如你所說的那樣工作。 – KosiB

+0

是的,本的上面的答案描述瞭如何以正確的方式做到這一點。 – jkff

1

我的理解(部分猜測)你想輸出的是要輸出在第一和第二迭代每個條目行,但我不知道爲什麼你不能只使用兩個for循環,而不是將iterable轉換爲一個字符串,然後解析它。例如:

public void processElement(ProcessContext c) { 
    KV<String, CoGbkResult> e = c.element(); 
    String Ad_ID = e.getKey(); 
    Iterable<String> Ad_Infos = e.getValue().getAll(AdInfoTag); 
    for (String ImpressionInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
    for (String Ad_Info : Ad_Infos) { 
     c.output(KV.of(Ad_ID, "%" + ImpressionInfo + Ad_Info)); 
    } 
    } 
}