使用下面的代碼,試圖寫的BigQuery處理與BigQuery空PCollections在Apache中梁
我使用Apache梁2.0.0
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
當我收到以下錯誤如果我將text.startsWith
更改爲D
,則一切正常(即輸出內容)。
有什麼方法來捕獲或看空PCollections?
基於StackTrace,看起來錯誤實際上是在BigQueryIO中 - 留在我的存儲桶中的文件有0個字節,可能這會導致BigQueryIO出現問題。
我使用的情況是,我使用側輸出,用於DeadLetters,當我的工作沒有產生死信輸出遇到這個錯誤,所以穩健操作,這將是有益的。
的工作應該真正能夠在批處理或流模式下運行,我最好的猜測是寫任何輸出GCS/TEXTIO在批處理模式和吉貝流時,如果這聽起來似乎在理?
任何幫助感激地收到。
public class EmptyPCollection {
public static void main(String [] args) {
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation("gs://<your-bucket-here>/temp");
Pipeline pipeline = Pipeline.create(options);
String schema = "{\"fields\": [{\"name\": \"pet\", \"type\": \"string\", \"mode\": \"required\"}]}";
String table = "<your-dataset>.<your-table>";
List<String> pets = Arrays.asList("Dog", "Cat", "Goldfish");
PCollection<String> inputText = pipeline.apply(Create.of(pets)).setCoder(StringUtf8Coder.of());
PCollection<TableRow> rows = inputText.apply(ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String text = c.element();
if (text.startsWith("X")) { // change to (D)og and works fine
TableRow row = new TableRow();
row.set("pet", text);
c.output(row);
}
}
}));
rows.apply(BigQueryIO.writeTableRows().to(table).withJsonSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
pipeline.run().waitUntilFinish();
}
}
[direct-runner-worker] INFO org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://<your-bucket>/temp/BigQueryWriteTemp/05c7a7c0786a4656abad97f11ef23d8e/2675e1c7-f4d7-4f78-a85f-a38095b57e6b.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at EmptyPCollection.main(EmptyPCollection.java:54)
Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)
看起來是一樣的錯誤,我打了一段時間後:https://stackoverflow.com/questions/30204862/how-to-use-flatten-correctly-in-dataflow。可能值得在這裏提高它:https://issuetracker.google.com/issues?q=componentid:187168%20status:open –
當然看起來像是一樣的東西。非常感謝 - 至少這不是我瘋了!將着眼於提出一個問題並讓這個問題得到更多的關注,從長遠來看,這可能會導致工作匆忙。 – Chris