是否可以創建從Pub/Sub讀取數據並寫入數據存儲的管道?在我的代碼中,我將PubsubIO指定爲輸入,並將窗口應用於獲取有界的PCollection,但似乎不可能將options.setStreaming用作true,而使用DatastoreIO.writeTo,而這是爲了使用而需要的PubsubIO作爲輸入。有沒有解決的辦法?或者是不可能從pubsub讀取並寫入數據存儲?從PubsubIO讀取寫入DatastoreIO
這裏是我的代碼:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
這是個例外,我得到:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
謝謝,這是有益的。但是現在我面臨着從Dataflow應用程序(不是AppEngine應用程序)調用Datastore API的問題,並且數據存儲區API顯然依賴於AppEngine功能,這些功能僅適用於在AppEngine上運行的應用程序。然後,我發現遠程API似乎提供了我所需要的,但我仍然面臨使用它的困難。我需要使用服務帳戶進行身份驗證嗎?我遵循這個[頁面]上的代碼示例(https://cloud.google.com/appengine/docs/java/tools/remoteapi),但我得到一個HttpResponseException,302 – lilline
您是否正在嘗試寫入數據存儲實例屬於您的Dataflow管道中的不同項目?如果是這樣,請查看https://cloud.google.com/dataflow/security-and-permissions#cross-project,瞭解如何設置 – danielm
不,數據存儲實例與數據流是同一項目的一部分,我解決了302問題。但是,如何在ParDo中使用Remote API?(我在這裏猜測)ParDo在與父管道不同的線程或實例中運行DoFn函數,並且遠程API安裝程序不可序列化,並且安裝程序僅在創建它的線程上可用?我不確定這是否是問題,但無論如何,根據我嘗試創建和訪問安裝程序的位置,我會得到不同的例外。 – lilline