2016-11-16 35 views
2

當試圖在~800.000文件上運行大型轉換時,嘗試運行管道時出現上述錯誤消息。Google Dataflow:請求有效負載大小超出限制:10485760字節

下面是代碼:

public static void main(String[] args) { 
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());  
    GcsUtil u = getUtil(p.getOptions()); 

    try{ 
     List<GcsPath> paths = u.expand(GcsPath.fromUri("gs://tlogdataflow/stage/*.zip")); 
     List<String> strPaths = new ArrayList<String>(); 
     for(GcsPath pa: paths){ 
      strPaths.add(pa.toUri().toString()); 
     }   

     p.apply(Create.of(strPaths)) 
     .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))); 
     p.run(); 
    } 
    catch(IOException io){ 
     // 
    } 

}

我想這就是正是谷歌的數據流是?處理大量的文件/數據?

有沒有辦法拆分負載,使其工作?

感謝& BR

菲爾

回答

3

數據流是擅長處理大量的數據,但具有在管道的描述有多大能方面的限制。傳遞給Create.of()的數據目前嵌入在管道描述中,因此您無法在其中傳遞大量數據 - 相反,應從外部存儲中讀取大量數據,並且管道應僅指定其位置。

認爲它是程序可以處理的數據量與程序代碼本身的大小之間的區別。

您可以通過擴展解決這個問題發生在一個ParDo

p.apply(Create.of("gs://tlogdataflow/stage/*.zip")) 
.apply(ParDo.of(new ExpandFn())) 
.apply(...fusion break (see below)...) 
.apply(Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))) 

其中ExpandFn是一樣的東西如下:

private static class ExpandFn extends DoFn<String, String> { 
    @ProcessElement 
    public void process(ProcessContext c) { 
    GcsUtil util = getUtil(c.getPipelineOptions()); 
    for (String path : util.expand(GcsPath.fromUri(c.element()))) { 
     c.output(path); 
    } 
    } 
} 

融合休息我參考this(基本上,ParDo(add unique key) + group by key + Flatten.iterables() + Values.create())。這不是很方便,並且有關於添加內置轉換的討論(請參閱this PRthis thread)。

1

非常感謝!使用你的輸入我解決了這樣的問題:

public class ZipPipeline { 
private static final Logger LOG = LoggerFactory.getLogger(ZipPipeline.class); 

public static void main(String[] args) { 
Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());  

    try{ 
     p.apply(Create.of("gs://tlogdataflow/stage/*.zip")) 
     .apply(ParDo.of(new ExpandFN())) 
     .apply(ParDo.of(new AddKeyFN())) 
     .apply(GroupByKey.<String,String>create()) 
     .apply(ParDo.of(new FlattenFN())) 
     .apply("Unzip Files", Write.to(new ZipIO.Sink("gs://tlogdataflow/outbox"))); 
     p.run(); 

    } 
    catch(Exception e){ 
     LOG.error(e.getMessage()); 
    } 

} 

private static class FlattenFN extends DoFn<KV<String,Iterable<String>>, String>{ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public void processElement(ProcessContext c){ 
     KV<String,Iterable<String>> kv = c.element(); 
     for(String s: kv.getValue()){ 
      c.output(s); 
     } 


     } 

    } 

private static class ExpandFN extends DoFn<String,String>{ 
private static final long serialVersionUID = 1L; 

@Override 
    public void processElement(ProcessContext c) throws Exception{ 
     GcsUtil u = getUtil(c.getPipelineOptions()); 
     for(GcsPath path : u.expand(GcsPath.fromUri(c.element()))){ 
      c.output(path.toUri().toString()); 
     } 
    } 
} 

private static class AddKeyFN extends DoFn<String, KV<String,String>>{ 
    private static final long serialVersionUID = 1L; 
    @Override 
    public void processElement(ProcessContext c){ 
    String path = c.element(); 
    String monthKey = path.split("_")[4].substring(0, 6); 
    c.output(KV.of(monthKey, path)); 
    } 
}