我很難理解TextIO.write()的.withFileNamePolicy的概念。提供FileNamePolicy的要求似乎非常複雜,只需指定一個GCS存儲桶來寫入流式文件即可。GCP Dataflow 2.0 PubSub到GCS
在高層次上,我將JSON消息傳輸到PubSub主題,並且我想將這些原始消息寫入GCS中的文件以便永久存儲(我還將對消息進行其他處理) 。我最初開始這條管道,以爲這將是非常簡單的:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply("Write to GCS", TextIO.write().to(gcs_bucket);
p.run();
}
我得到了錯誤約需要WindowedWrites,我申請,然後需要一個FileNamePolicy。這是事情變得毛茸茸的地方。
我去了梁文檔和檢查出FilenamePolicy。看起來我需要擴展這個類,然後需要擴展其他抽象類來完成這個工作。不幸的是,關於Apache的文檔有點缺乏,我找不到任何Dataflow 2.0的例子,除了The Wordcount Example,它甚至用於在助手類中實現這些細節。
所以我可能只是通過複製大量的WordCount示例來完成這項工作,但我試圖更好地理解這些細節。我有幾個問題:
1)是否有任何路線圖項目來抽象很多這種複雜性?看來我應該能夠像在nonWindowedWrite中一樣提供GCS存儲桶,然後只提供一些基本選項,如時間和文件命名規則。我知道將流式窗口化數據寫入文件比打開文件指針(或對象存儲等效物)更復雜。
2)它看起來像做這個工作,我需要創建一個WindowedContext對象,它需要提供一個BoundedWindow抽象類,和PaneInfo對象類,然後一些分片信息。可用於這些信息是相當裸露的,我很難知道所有這些實際需要什麼,特別是考慮到我的簡單用例。有沒有可以實現這些的好例子?另外,它也看起來像我需要設置碎片#作爲TextIO.write的一部分,但是還會提供#shards作爲fileNamePolicy的一部分?
感謝任何幫助我理解背後的細節,希望能學到一些東西!
編輯7/20/17 所以我終於得到這個管道擴展FilenamePolicy運行。我的挑戰是需要從PubSub中定義流數據的窗口。下面是代碼的八九不離十表示:
public class ReadData {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Write to GCS", TextIO.write().to("gcs_bucket")
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));
p.run();
}
}
class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-%s-%s-of-%s.json",
"test",
window.start().toString(),
window.end().toString(),
context.getShardNumber(),
context.getShardNumber()
);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
嘿魯文,感謝信息。我想我以前嘗試過這種方式,但沒有運氣,當我看着它說它僅用於非窗口的文檔。我也在使用Google Cloud Dataflow,並且一直在使用2.0版本。聽起來像是即將到來的? https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/DefaultFilenamePolicy.html –
這將在Beam 2.1中發佈。一旦發佈,Google Cloud Dataflow 2.1將發佈包含此功能。您可以通過直接從最新的Beam快照構建來嘗試它。 –
另一個注意事項:編寫一個自定義的FilenamePolicy並不難。您只需重寫windowedFilename方法,並將輸入參數映射到文件名。不過請注意,這個API在即將發佈的版本中會稍微改變。 –