2017-05-31 40 views
0

我試圖將PCollection<String>聚合成PCollection<List<String>>,每個大約60個元素。Google Dataflow「elementCountExact」聚合

它們將被髮送到一個接受每個請求60個元素的API。 目前我正在嘗試通過開窗,但只有elementCountAtLeast,所以我必須將它們收集到一個列表中,並再次計數並拆分,以防它太長。這是相當繁瑣,導致很多名單隻有幾個要素:

Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(maxNrOfelementsPerList), 
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))) 
      .withAllowedLateness(Duration.ZERO) 
      .discardingFiredPanes()) 
      .apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults()) 
      .apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() { 
       @ProcessElement 
       public void apply(ProcessContext pc) { 
        splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output); 
       } 
      })); 

有直接和更一致的方式做到這一點的聚集?

回答

1

這可以使用Dataflow 2.x中的State API來構建。

基本上,你會編寫一個有狀態的DoFn,它有兩個狀態 - 一個元素數量和一個緩衝元素的「袋子」數量。

當一個元素到達時,您將它添加到包中並遞增計數。然後你檢查計數,如果它是60你輸出它,並清除這兩個狀態。由於Stateful DoFn的每個密鑰都將在單臺機器上運行,所以隨機將這些元素分佈到N個密鑰中可能會很好,因此您可以擴展到N臺機器(多個密鑰可以在一臺機器上運行) 。

+0

非常感謝,我會盡快嘗試,並標記您的答案。 – Chrisport