2016-11-23 82 views
2

我想實現一個Reshuffle變換,以防止excessive fusion,但我不知道如何改變版本<KV<String,String>>處理簡單PCollections。 (描述here如何洗牌PCollection <KV<String,String>>。)如何重新洗牌PCollection <T>?

我怎麼會加入我的管道更多的步驟之前展開正式的Avro I/O example code重新洗牌?

PipelineOptions options = PipelineOptionsFactory.create(); 
Pipeline p = Pipeline.create(options); 

Schema schema = new Schema.Parser().parse(new File("schema.avsc")); 

PCollection<GenericRecord> records = 
    p.apply(AvroIO.Read.named("ReadFromAvro") 
     .from("gs://my_bucket/path/records-*.avro") 
     .withSchema(schema)); 

回答

3

感謝由谷歌支持團隊提供的代碼片段我想通了:

爲了得到一個重新洗牌PCollection:

PCollection<T> reshuffled = data.apply(Repartition.of()); 

使用的磁盤分割類:

import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.GroupByKey; 
import com.google.cloud.dataflow.sdk.transforms.PTransform; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.KV; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import java.util.concurrent.ThreadLocalRandom; 

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> { 

    private Repartition() {} 

    public static <T> Repartition<T> of() { 
     return new Repartition<T>(); 
    } 

    @Override 
    public PCollection<T> apply(PCollection<T> input) { 
     return input 
       .apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>())) 
       .apply(GroupByKey.<Integer, T>create()) 
       .apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>())); 
    } 

    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); 
     } 
    } 

    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      for (T s : c.element().getValue()) { 
       c.output(s); 
      } 
     } 
    } 
} 
+0

你能詳細說一下'AddArbitaryKey'嗎?爲什麼「AddArbitraryKey」的必要性和特殊實現是重要的,即它是否會影響密鑰空間在工作人員中分佈的方式? – harveyxia

+0

應該引起再分配一樣了'Redistribution'變換一種武斷的方式(參見:https://github.com/apache/incubator-beam/pull/1036)。隨機選擇的整數鍵應導致隨機分佈。 – Tobi

+0

謝謝,你的'Redistribution'的用例是什麼? – harveyxia