2015-10-17 41 views
1

我想在perKey基礎上遍歷KV pCollection的值。我用下面的代碼中使用自定義類結合起來,在使用CombineFn累加來自所有節點的數據後,合併每個鍵的所有值

PCollection<KV<String, String>> combinesAttributes = 
       valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
         new CombineAttributes())); 

,下面將我的自定義組合類,

public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> { 
    @Override 
    public String apply(Iterable<String> input) {...}..} 

這對於小本投入,但對於大投入並沒有如預期聯合收割機正常工作。輸出結果只有一個鍵的值很少,其他的則沒有。我假定輸出只有來自一個節點的組合數據。

https://cloud.google.com/dataflow/model/combine中的文檔提到了使用CombineFn以便在所有節點中合併每個鍵的完整收集值。

但是,當我改變了自定義如下組合函數,我收到以下錯誤,

incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String> 

合併功能

public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> { 

public static class Accum { 
    List<String> inputList = new ArrayList<String>(); 
} 
public Accum createAccumulator() { return new Accum(); } 
public Accum addInput(Accum accum, Iterable<String> input) { 
    for (String item : input) { 
    accum.inputList.add(item); 
    } 
    return accum; 
} 
public Accum mergeAccumulators(Iterable<Accum> accums) { 
    Accum merged = createAccumulator(); 
    for (Accum accum : accums) { 
    for (String item : accum.inputList) { 
     merged.inputList.add(item); 
    } 
    } 
    return merged; 
} 
public String extractOutput(Accum accum) { 
    return ""; 
} 
} 

有沒有示例代碼可用於聯合收割機perKey延長CombineFn 。請讓我知道上面的代碼有什麼問題。

回答

5

如果您只是想遍歷所有值,則可以使用GroupByKeyPCollection<KV<K, V>>轉換爲PCollection<KV<K, Iterable<V>>。然後你可以編寫一個處理每個元素的DoFn,並在Iterable<V>中迭代。

請注意,您只會在同一窗口中接收與某個鍵關聯的所有值。如果您使用默認的全局窗口,那將是所有值。當你想所有的值組合成一個較小的輸出


CombineCombineFn是最有用的。例如,如果您想獲得所有值的總和或平均值,則使用Sum.perKey()Mean.perKey()這樣做會更有效。效率來自能夠傳遞(併合並)累加器。在Sum的情況下,這對應於部分和。

舉一個例子,假設管道在兩臺機器上運行。第一臺機器處理KV<user1, attr1a>, KV<user1, attr1b>, KV<user2, attr2a>,第二臺機器處理KV<user1, attr1c>, KV<user2, attr2b>

CombineAttributes(無論採用哪種方式)首先會在每臺機器上調用。所以它可以將[attr1a, attr1b]組合成單個字符串或累加器(比如attr1a+attr1b)。然後它會在另一臺機器上運行,將[attr1c]attr1c結合起來。然後它將合併所有這些部分結果以獲得最終的累加器 - attr1a+attr1b+attr1c。在最初實施的情況下,這將是最終的答案。在後者中,將在該累加器上調用extractOutput

相關問題