2017-08-04 25 views
3

我在同一個主題上看到很多問題。但是,我仍然在寫入GCS時遇到問題。我正在閱讀pubsub的主題,並試圖將其推廣到GCS。我提到了this link。但是,在最新的束包裝中找不到IOChannelUtils。寫一個無限的收集到GCS

PCollection<String> details = pipeline 
      .apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic")); 

PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() { 
     public String apply(String s) { 
      return "constant"; 
     } 
    })); 

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY) 
      .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10)) 
        .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10), 
          AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS)))) 
      .discardingFiredPanes()).apply(GroupByKey.create()); 

    PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create()); 

這個我已經從堆棧溢出中的許多其他類似的話題。現在,我明白了,TextIO支持帶有withWindowedWrites和withNumShards的無限PCollection寫入選項。

裁判:Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn

但是,我不明白,我應該怎麼做。

我正在嘗試寫入GCS,如下所示。

FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
      StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, ""); 

    details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites() 
      .withFilenamePolicy(policy).withNumShards(4)); 

我沒有足夠的觀點來爲堆棧溢出中的這些主題添加註釋,因此我將其作爲一個不同的問題提出來。

回答

2

我可以通過修改窗下方

PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows()) 
      .triggering(Repeatedly 
        .forever(AfterProcessingTime 
          .pastFirstElementInPane() 
          .plusDelayOf(Duration.standardSeconds(30)) 
         )).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()); 

streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles())); 


public static class PerWindowFiles extends FileBasedSink.FilenamePolicy { 

public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { 

// OVERRIDE THE FILE NAME CREATION 
} 

} 

給出解決這個問題。雖然我可以解決這個問題是這樣,我仍然不知道這裏的窗口概念。我會在找到它時添加更多細節。如果有人有更多的理解,請添加更多細節。 謝謝

3

看看這個Pub/Sub to GCS Pipeline它提供了一個寫入窗口文件到GCS的完整例子。

+0

嘿..謝謝你的答案。我可以在幾分鐘之前完成它。我會用我採取的方法更新這個答案。再次感謝! – Balu