2017-04-02 34 views
3

我正在運行帶有PubSub源的管道,並且遇到了一些導致我的管道崩潰的奇怪異常。我可以處理幾個元素(3-10),然後突然之間出現以下兩個錯誤消息之一。兩者都不給我一個線索,我可能會做錯,所以我刪除了所有的轉換,只留下了源代碼,問題依然存在。我只發佈了一些測試字符串到PubSub。任何幫助表示讚賞。Apache Beam PubSub讀卡器異常

異常1:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.NullPointerException 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:640) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:313) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:174) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:127) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

例外2:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.IllegalStateException: Cannot finalize a restored checkpoint 
     at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:293) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:205) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:142) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

Basic代碼:

PipelineOptions options = PipelineOptionsFactory.create(); 
PubsubOptions dataflowOptions = options.as(PubsubOptions.class); 
dataflowOptions.setStreaming(true); 

Pipeline p = Pipeline.create(options); 

p.apply(PubsubIO.<String>read().subscription("my-subscription") 
    .withCoder(StringUtf8Coder.of()))); 

執行:

mvn compile exec:java -Dexec.mainClass=my.package.SalesTransactions -Dexec.args="--runner BlockingDataflowRunner --project=my-project --tempLocation=gs://my-project/tmp" 

回答

1

由於DirectRunner中的Bug(BEAM-1656)和PubsubCheckpoint中的前提條件,存在此問題。

答案Apache Beam: PubsubReader fails with NPE包含有關錯誤以及如何解決的更多信息。謝謝!

+0

感謝您的回答。我已經更新到最新的快照,並且是第一個有希望的,因爲它不會馬上發生,但似乎在一段時間之後,上面的NullPointerException仍在發生。 – jimmy

+1

在https://github.com/apache/beam/pull/2368解決了PubSub中的一個錯誤,它很快就會提交。我帶來的不便表示歉意。 –