0
我目前正試圖基於Apache梁SDK V2.1.0像Google tutorial阿帕奇梁模板:運行時錯誤上下文
創建數據流模板這是我的主類
public static void main(String[] args) {
// Initialize options
DispatcherOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DispatcherOptions.class);
// Create pipeline
Pipeline pipeline = Pipeline.create(options);
// Get messages
PCollection<PubsubMessage> messages = pipeline.apply("ReadMain", PubsubIO.readMessages().fromSubscription(options.getSubscription()));
}
如果我執行
mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=[YOUR_PROJECT_ID] \
--stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
--templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
命令它的工作,如果我使用的方法
PubsubIO.readMessages().fromTopic(options.getTopic()));
但如果
PubsubIO.readMessages().fromSubscription(options.getSubscription()));
錯誤
[WARNING]
java.lang.RuntimeException: Not called from a runtime context.
at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:223)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.getSubscription(PubsubUnboundedSource.java:1374)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.<init>(PubsubUnboundedSource.java:1103)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.expand(PubsubUnboundedSource.java:1407)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.expand(PubsubUnboundedSource.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read.expand(PubsubIO.java:730)
at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read.expand(PubsubIO.java:536)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:180)
at com.example.myclass.main(MyClass.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
該補丁是可用的,它的效果很好。我們可以使用fromSubscription模板 –