5

是否可以創建從Pub/Sub讀取數據並寫入數據存儲的管道?在我的代碼中,我將PubsubIO指定爲輸入,並將窗口應用於獲取有界的PCollection,但似乎不可能將options.setStreaming用作true,而使用DatastoreIO.writeTo,而這是爲了使用而需要的PubsubIO作爲輸入。有沒有解決的辦法?或者是不可能從pubsub讀取並寫入數據存儲?從PubsubIO讀取寫入DatastoreIO

這裏是我的代碼:

DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming")); 
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1))); 
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) { 
      String msg = c.element(); 
      byte[] decoded = Base64.decodeBase64(msg.getBytes()); 
      String outmsg = new String(decoded); 
      c.output(outmsg); 
     } 
    })); 
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events"))); 

    inputEntity.apply(DatastoreIO.writeTo(datasetid)); 


    p.run(); 

這是個例外,我得到:

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner. 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159) 
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104) 

回答

5

的DatastoreIO片目前尚未在流亞軍支持。要從流式管道寫入數據存儲區,可以直接從DoFn調用數據存儲區API。

+0

謝謝,這是有益的。但是現在我面臨着從Dataflow應用程序(不是AppEngine應用程序)調用Datastore API的問題,並且數據存儲區API顯然依賴於AppEngine功能,這些功能僅適用於在AppEngine上運行的應用程序。然後,我發現遠程API似乎提供了我所需要的,但我仍然面臨使用它的困難。我需要使用服務帳戶進行身份驗證嗎?我遵循這個[頁面]上的代碼示例(https://cloud.google.com/appengine/docs/java/tools/remoteapi),但我得到一個HttpResponseException,302 – lilline

+0

您是否正在嘗試寫入數據存儲實例屬於您的Dataflow管道中的不同項目?如果是這樣,請查看https://cloud.google.com/dataflow/security-and-permissions#cross-project,瞭解如何設置 – danielm

+0

不,數據存儲實例與數據流是同一項目的一部分,我解決了302問題。但是,如何在ParDo中使用Remote API?(我在這裏猜測)ParDo在與父管道不同的線程或實例中運行DoFn函數,並且遠程API安裝程序不可序列化,並且安裝程序僅在創建它的線程上可用?我不確定這是否是問題,但無論如何,根據我嘗試創建和訪問安裝程序的位置,我會得到不同的例外。 – lilline

4

好吧,在我的頭撞牆後,我終於找到了工作。就像danielm建議的那樣,我從ParDo DoFn調用Datastore API。一個問題是,我沒有意識到在AppEngine之外有一個獨立的API用於使用Cloud Datastore。 (com.google.api.services.datastore ...與com.google.appengine.api.datastore ...)。另一個問題是,顯然在最新版本的Cloud Datastore API(google-api-services-datastore-protobuf v1beta2-rev1-4.0.0,我有一個IllegalAccessError)中存在某種錯誤,我解決了這個問題,通過使用舊版本(v1beta2-rev1-2.1.2)。

所以,這是我的工作代碼:

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.api.services.datastore.DatastoreV1.*; 
import com.google.api.services.datastore.client.Datastore; 
import com.google.api.services.datastore.client.DatastoreException; 
import com.google.api.services.datastore.client.DatastoreFactory; 
import static com.google.api.services.datastore.client.DatastoreHelper.*; 
import java.security.GeneralSecurityException; 
import java.io.IOException; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 

//-------------------- 

public static void main(String[] args) { 
    DataflowPipelineOptions options = PipelineOptionsFactory.create() 
      .as(DataflowPipelineOptions.class); 

    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectName); 
    options.setStagingLocation("gs://my-staging-bucket/staging"); 
    options.setStreaming(true); 

    Pipeline p = Pipeline.create(options); 
    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name")); 

    input.apply(ParDo.of(new DoFn<String, String>() { 
     private static final long serialVersionUID = 1L; 
     public void processElement(ProcessContext c) throws ParseException, DatastoreException { 

      JSONObject json = (JSONObject)new JSONParser().parse(c.element()); 

      Datastore datastore = null; 
      try { 
       datastore = DatastoreFactory.get().create(getOptionsFromEnv() 
         .dataset(datasetid).build()); 
      } catch (GeneralSecurityException exception) { 
       System.err.println("Security error connecting to the datastore: " + exception.getMessage()); 
      } catch (IOException exception) { 
       System.err.println("I/O error connecting to the datastore: " + exception.getMessage()); 
      } 

      Key.Builder keyBuilder = makeKey("my-kind"); 
      keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace"); 
      Entity.Builder event = Entity.newBuilder() 
        .setKey(keyBuilder); 

      event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop")))); 

      CommitRequest commitRequest = CommitRequest.newBuilder() 
        .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) 
        .setMutation(Mutation.newBuilder().addInsertAutoId(event)) 
        .build(); 
      if(datastore!=null){ 
       datastore.commit(commitRequest); 
      } 

     } 
    })); 


    p.run(); 
} 

而且在pom.xml中的依賴關係:

<dependency> 
    <groupId>com.google.cloud.dataflow</groupId> 
    <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> 
    <version>[1.0.0,2.0.0)</version> 
</dependency> 
<dependency> 
    <groupId>com.google.apis</groupId> 
    <artifactId>google-api-services-datastore-protobuf</artifactId> 
    <version>v1beta2-rev1-2.1.2</version> 
</dependency> 
<dependency> 
    <groupId>com.google.http-client</groupId> 
    <artifactId>google-http-client</artifactId> 
    <version>1.17.0-rc</version> 
</dependency> 
<!-- Some more.. like JUnit etc.. -->