3

我很難理解TextIO.write()的.withFileNamePolicy的概念。提供FileNamePolicy的要求似乎非常複雜,只需指定一個GCS存儲桶來寫入流式文件即可。GCP Dataflow 2.0 PubSub到GCS

在高層次上,我將JSON消息傳輸到PubSub主題,並且我想將這些原始消息寫入GCS中的文件以便永久存儲(我還將對消息進行其他處理) 。我最初開始這條管道,以爲這將是非常簡單的:

public static void main(String[] args) { 

     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 

     Pipeline p = Pipeline.create(options); 

     p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic)) 
      .apply("Write to GCS", TextIO.write().to(gcs_bucket); 

     p.run(); 

    } 

我得到了錯誤約需要WindowedWrites,我申請,然後需要一個FileNamePolicy。這是事情變得毛茸茸的地方。

我去了梁文檔和檢查出FilenamePolicy。看起來我需要擴展這個類,然後需要擴展其他抽象類來完成這個工作。不幸的是,關於Apache的文檔有點缺乏,我找不到任何Dataflow 2.0的例子,除了The Wordcount Example,它甚至用於在助手類中實現這些細節。

所以我可能只是通過複製大量的WordCount示例來完成這項工作,但我試圖更好地理解這些細節。我有幾個問題:

1)是否有任何路線圖項目來抽象很多這種複雜性?看來我應該能夠像在nonWindowedWrite中一樣提供GCS存儲桶,然後只提供一些基本選項,如時間和文件命名規則。我知道將流式窗口化數據寫入文件比打開文件指針(或對象存儲等效物)更復雜。

2)它看起來像做這個工作,我需要創建一個WindowedContext對象,它需要提供一個BoundedWindow抽象類,和PaneInfo對象類,然後一些分片信息。可用於這些信息是相當裸露的,我很難知道所有這些實際需要什麼,特別是考慮到我的簡單用例。有沒有可以實現這些的好例子?另外,它也看起來像我需要設置碎片#作爲TextIO.write的一部分,但是還會提供#shards作爲fileNamePolicy的一部分?

感謝任何幫助我理解背後的細節,希望能學到一些東西!

編輯7/20/17 所以我終於得到這個管道擴展FilenamePolicy運行。我的挑戰是需要從Pu​​bSub中定義流數據的窗口。下面是代碼的八九不離十表示:

public class ReadData { 
    public static void main(String[] args) { 

     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 

     Pipeline p = Pipeline.create(options); 

     p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic)) 
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))) 
      .apply("Write to GCS", TextIO.write().to("gcs_bucket") 
       .withWindowedWrites() 
       .withFilenamePolicy(new TestPolicy()) 
       .withNumShards(10)); 

     p.run(); 

    } 
} 

class TestPolicy extends FileBasedSink.FilenamePolicy { 
    @Override 
    public ResourceId windowedFilename(
     ResourceId outputDirectory, WindowedContext context, String extension) { 
     IntervalWindow window = (IntervalWindow) context.getWindow(); 
     String filename = String.format(
      "%s-%s-%s-%s-of-%s.json", 
      "test", 
      window.start().toString(), 
      window.end().toString(), 
      context.getShardNumber(), 
      context.getShardNumber() 
     ); 
     return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); 
    } 

    @Override 
    public ResourceId unwindowedFilename(
     ResourceId outputDirectory, Context context, String extension) { 
     throw new UnsupportedOperationException("Unsupported."); 
    } 
} 

回答

1

梁目前DefaultFilenamePolicy支持窗口寫,所以沒有必要寫一個自定義的FilenamePolicy。您可以通過在文件名模板中放置W和P佔位符(分別針對窗口和窗格)來控制輸出文件名。這存在於頭梁庫中,並且也將在即將發佈的Beam 2.1版本中發佈(我們正在發佈)。

+0

嘿魯文,感謝信息。我想我以前嘗試過這種方式,但沒有運氣,當我看着它說它僅用於非窗口的文檔。我也在使用Google Cloud Dataflow,並且一直在使用2.0版本。聽起來像是即將到來的? https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/DefaultFilenamePolicy.html –

+0

這將在Beam 2.1中發佈。一旦發佈,Google Cloud Dataflow 2.1將發佈包含此功能。您可以通過直接從最新的Beam快照構建來嘗試它。 –

+0

另一個注意事項:編寫一個自定義的FilenamePolicy並不難。您只需重寫windowedFilename方法,並將輸入參數映射到文件名。不過請注意,這個API在即將發佈的版本中會稍微改變。 –

4

在Beam 2.0中,下面是將來自PubSub的原始消息寫入GCS上的窗口化文件的示例。該管道是相當可配置的,允許您通過參數和子目錄策略指定窗口持續時間,如果您希望數據的邏輯子部分便於重新處理/歸檔。請注意,這對Apache Commons Lang 3有額外的依賴性。

PubSubToGcs

/** 
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and 
* outputs the raw data into windowed files at the specified output 
* directory. 
*/ 
public class PubsubToGcs { 

    /** 
    * Options supported by the pipeline. 
    * 
    * <p>Inherits standard configuration options.</p> 
    */ 
    public static interface Options extends DataflowPipelineOptions, StreamingOptions { 
    @Description("The Cloud Pub/Sub topic to read from.") 
    @Required 
    ValueProvider<String> getTopic(); 
    void setTopic(ValueProvider<String> value); 

    @Description("The directory to output files to. Must end with a slash.") 
    @Required 
    ValueProvider<String> getOutputDirectory(); 
    void setOutputDirectory(ValueProvider<String> value); 

    @Description("The filename prefix of the files to write to.") 
    @Default.String("output") 
    @Required 
    ValueProvider<String> getOutputFilenamePrefix(); 
    void setOutputFilenamePrefix(ValueProvider<String> value); 

    @Description("The shard template of the output file. Specified as repeating sequences " 
     + "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the " 
     + "shard number, or number of shards respectively") 
    @Default.String("") 
    ValueProvider<String> getShardTemplate(); 
    void setShardTemplate(ValueProvider<String> value); 

    @Description("The suffix of the files to write.") 
    @Default.String("") 
    ValueProvider<String> getOutputFilenameSuffix(); 
    void setOutputFilenameSuffix(ValueProvider<String> value); 

    @Description("The sub-directory policy which files will use when output per window.") 
    @Default.Enum("NONE") 
    SubDirectoryPolicy getSubDirectoryPolicy(); 
    void setSubDirectoryPolicy(SubDirectoryPolicy value); 

    @Description("The window duration in which data will be written. Defaults to 5m. " 
     + "Allowed formats are: " 
     + "Ns (for seconds, example: 5s), " 
     + "Nm (for minutes, example: 12m), " 
     + "Nh (for hours, example: 2h).") 
    @Default.String("5m") 
    String getWindowDuration(); 
    void setWindowDuration(String value); 

    @Description("The maximum number of output shards produced when writing.") 
    @Default.Integer(10) 
    Integer getNumShards(); 
    void setNumShards(Integer value); 
    } 

    /** 
    * Main entry point for executing the pipeline. 
    * @param args The command-line arguments to the pipeline. 
    */ 
    public static void main(String[] args) { 

    Options options = PipelineOptionsFactory 
     .fromArgs(args) 
     .withValidation() 
     .as(Options.class); 

    run(options); 
    } 

    /** 
    * Runs the pipeline with the supplied options. 
    * 
    * @param options The execution parameters to the pipeline. 
    * @return The result of the pipeline execution. 
    */ 
    public static PipelineResult run(Options options) { 
    // Create the pipeline 
    Pipeline pipeline = Pipeline.create(options); 

    /** 
    * Steps: 
    * 1) Read string messages from PubSub 
    * 2) Window the messages into minute intervals specified by the executor. 
    * 3) Output the windowed files to GCS 
    */ 
    pipeline 
     .apply("Read PubSub Events", 
     PubsubIO 
      .readStrings() 
      .fromTopic(options.getTopic())) 
     .apply(options.getWindowDuration() + " Window", 
      Window 
      .into(FixedWindows.of(parseDuration(options.getWindowDuration())))) 
     .apply("Write File(s)", 
      TextIO 
      .write() 
      .withWindowedWrites() 
      .withNumShards(options.getNumShards()) 
      .to(options.getOutputDirectory()) 
      .withFilenamePolicy(
       new WindowedFilenamePolicy(
        options.getOutputFilenamePrefix(), 
        options.getShardTemplate(), 
        options.getOutputFilenameSuffix()) 
       .withSubDirectoryPolicy(options.getSubDirectoryPolicy()))); 

    // Execute the pipeline and return the result. 
    PipelineResult result = pipeline.run(); 

    return result; 
    } 

    /** 
    * Parses a duration from a period formatted string. Values 
    * are accepted in the following formats: 
    * <p> 
    * Ns - Seconds. Example: 5s<br> 
    * Nm - Minutes. Example: 13m<br> 
    * Nh - Hours. Example: 2h 
    * 
    * <pre> 
    * parseDuration(null) = NullPointerException() 
    * parseDuration("") = Duration.standardSeconds(0) 
    * parseDuration("2s") = Duration.standardSeconds(2) 
    * parseDuration("5m") = Duration.standardMinutes(5) 
    * parseDuration("3h") = Duration.standardHours(3) 
    * </pre> 
    * 
    * @param value The period value to parse. 
    * @return The {@link Duration} parsed from the supplied period string. 
    */ 
    private static Duration parseDuration(String value) { 
    Preconditions.checkNotNull(value, "The specified duration must be a non-null value!"); 

    PeriodParser parser = new PeriodFormatterBuilder() 
     .appendSeconds().appendSuffix("s") 
     .appendMinutes().appendSuffix("m") 
     .appendHours().appendSuffix("h") 
     .toParser(); 

    MutablePeriod period = new MutablePeriod(); 
    parser.parseInto(period, value, 0, Locale.getDefault()); 

    Duration duration = period.toDurationFrom(new DateTime(0)); 
    return duration; 
    } 
} 


WindowedFilenamePolicy

/** 
* The {@link WindowedFilenamePolicy} class will output files 
* to the specified location with a format of output-yyyyMMdd'T'HHmmssZ-001-of-100.txt. 
*/ 
@SuppressWarnings("serial") 
public class WindowedFilenamePolicy extends FilenamePolicy { 

    /** 
    * Possible sub-directory creation modes. 
    */ 
    public static enum SubDirectoryPolicy { 
     NONE("."), 
     PER_HOUR("yyyy-MM-dd/HH"), 
     PER_DAY("yyyy-MM-dd"); 

     private final String subDirectoryPattern; 

     private SubDirectoryPolicy(String subDirectoryPattern) { 
      this.subDirectoryPattern = subDirectoryPattern; 
     } 

     public String getSubDirectoryPattern() { 
      return subDirectoryPattern; 
     } 

     public String format(Instant instant) { 
      DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern); 
      return formatter.print(instant); 
     } 
    } 

    /** 
    * The formatter used to format the window timestamp for outputting to the filename. 
    */ 
    private static final DateTimeFormatter formatter = ISODateTimeFormat 
      .basicDateTimeNoMillis() 
      .withZone(DateTimeZone.getDefault()); 

    /** 
    * The filename prefix. 
    */ 
    private final ValueProvider<String> prefix; 

    /** 
    * The filenmae suffix. 
    */ 
    private final ValueProvider<String> suffix; 

    /** 
    * The shard template used during file formatting. 
    */ 
    private final ValueProvider<String> shardTemplate; 

    /** 
    * The policy which dictates when or if sub-directories are created 
    * for the windowed file output. 
    */ 
    private ValueProvider<SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE); 

    /** 
    * Constructs a new {@link WindowedFilenamePolicy} with the 
    * supplied prefix used for output files. 
    * 
    * @param prefix The prefix to append to all files output by the policy. 
    * @param shardTemplate The template used to create uniquely named sharded files. 
    * @param suffix The suffix to append to all files output by the policy. 
    */ 
    public WindowedFilenamePolicy(String prefix, String shardTemplate, String suffix) { 
     this(StaticValueProvider.of(prefix), 
       StaticValueProvider.of(shardTemplate), 
       StaticValueProvider.of(suffix)); 
    } 

    /** 
    * Constructs a new {@link WindowedFilenamePolicy} with the 
    * supplied prefix used for output files. 
    * 
    * @param prefix The prefix to append to all files output by the policy. 
    * @param shardTemplate The template used to create uniquely named sharded files. 
    * @param suffix The suffix to append to all files output by the policy. 
    */ 
    public WindowedFilenamePolicy(
      ValueProvider<String> prefix, 
      ValueProvider<String> shardTemplate, 
      ValueProvider<String> suffix) { 
     this.prefix = prefix; 
     this.shardTemplate = shardTemplate; 
     this.suffix = suffix; 
    } 

    /** 
    * The subdirectory policy will create sub-directories on the 
    * filesystem based on the window which has fired. 
    * 
    * @param policy The subdirectory policy to apply. 
    * @return The filename policy instance. 
    */ 
    public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy) { 
     return withSubDirectoryPolicy(StaticValueProvider.of(policy)); 
    } 

    /** 
    * The subdirectory policy will create sub-directories on the 
    * filesystem based on the window which has fired. 
    * 
    * @param policy The subdirectory policy to apply. 
    * @return The filename policy instance. 
    */ 
    public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider<SubDirectoryPolicy> policy) { 
     this.subDirectoryPolicy = policy; 
     return this; 
    } 

    /** 
    * The windowed filename method will construct filenames per window in the 
    * format of output-yyyyMMdd'T'HHmmss-001-of-100.txt. 
    */ 
    @Override 
    public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext c, String extension) { 
     Instant windowInstant = c.getWindow().maxTimestamp(); 
     String datetimeStr = formatter.print(windowInstant.toDateTime()); 

     // Remove the prefix when it is null so we don't append the literal 'null' 
     // to the start of the filename 
     String filenamePrefix = prefix.get() == null ? datetimeStr : prefix.get() + "-" + datetimeStr; 
     String filename = DefaultFilenamePolicy.constructName(
       filenamePrefix, 
       shardTemplate.get(), 
       StringUtils.defaultIfBlank(suffix.get(), extension), // Ignore the extension in favor of the suffix. 
       c.getShardNumber(), 
       c.getNumShards()); 

     String subDirectory = subDirectoryPolicy.get().format(windowInstant); 
     return outputDirectory 
       .resolve(subDirectory, StandardResolveOptions.RESOLVE_DIRECTORY) 
       .resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
    } 

    /** 
    * Unwindowed writes are unsupported by this filename policy so an {@link UnsupportedOperationException} 
    * will be thrown if invoked. 
    */ 
    @Override 
    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) { 
    throw new UnsupportedOperationException("There is no windowed filename policy for unwindowed file" 
     + " output. Please use the WindowedFilenamePolicy with windowed writes or switch filename policies."); 
    } 
} 
+0

在關於臨時文件的[bug](https://issues.apache.org/jira/browse/BEAM-3169)提交後,代碼在Beam 2.2中發生了一些變化。 [code](https://github.com/apache/beam/blob/761ec1af410f0ee153893f6e7082db85d0fdc3e7/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java)的一個很好的例子現在使用。 –

相關問題