2017-06-01 104 views
0

使用下面的代碼,試圖寫的BigQuery處理與BigQuery空PCollections在Apache中梁

我使用Apache梁2.0.0

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException

當我收到以下錯誤如果我將text.startsWith更改爲D,則一切正常(即輸出內容)。

有什麼方法來捕獲或看空PCollections?

基於StackTrace,看起來錯誤實際上是在BigQueryIO中 - 留在我的存儲桶中的文件有0個字節,可能這會導致BigQueryIO出現問題。

我使用的情況是,我使用側輸出,用於DeadLetters,當我的工作沒有產生死信輸出遇到這個錯誤,所以穩健操作,這將是有益的。

的工作應該真正能夠在批處理或流模式下運行,我最好的猜測是寫任何輸出GCS/TEXTIO在批處理模式和吉貝流時,如果這聽起來似乎在理?

任何幫助感激地收到。

public class EmptyPCollection { 

public static void main(String [] args) { 

    PipelineOptions options = PipelineOptionsFactory.create(); 
    options.setTempLocation("gs://<your-bucket-here>/temp"); 
    Pipeline pipeline = Pipeline.create(options); 
    String schema = "{\"fields\": [{\"name\": \"pet\", \"type\": \"string\", \"mode\": \"required\"}]}"; 
    String table = "<your-dataset>.<your-table>"; 
    List<String> pets = Arrays.asList("Dog", "Cat", "Goldfish"); 
    PCollection<String> inputText = pipeline.apply(Create.of(pets)).setCoder(StringUtf8Coder.of()); 
    PCollection<TableRow> rows = inputText.apply(ParDo.of(new DoFn<String, TableRow>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      String text = c.element(); 
      if (text.startsWith("X")) { // change to (D)og and works fine 
       TableRow row = new TableRow(); 
       row.set("pet", text); 
       c.output(row); 
      } 
     } 
    })); 

    rows.apply(BigQueryIO.writeTableRows().to(table).withJsonSchema(schema) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

    pipeline.run().waitUntilFinish(); 

} 

}

[direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://<your-bucket>/temp/BigQueryWriteTemp/05c7a7c0786a4656abad97f11ef23d8e/2675e1c7-f4d7-4f78-a85f-a38095b57e6b. 

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException 
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322) 
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292) 
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) 
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) 
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) 
at EmptyPCollection.main(EmptyPCollection.java:54) 
Caused by: java.lang.NullPointerException 
at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97) 
+0

看起來是一樣的錯誤,我打了一段時間後:https://stackoverflow.com/questions/30204862/how-to-use-flatten-correctly-in-dataflow。可能值得在這裏提高它:https://issuetracker.google.com/issues?q=componentid:187168%20status:open –

+0

當然看起來像是一樣的東西。非常感謝 - 至少這不是我瘋了!將着眼於提出一個問題並讓這個問題得到更多的關注,從長遠來看,這可能會導致工作匆忙。 – Chris

回答