2015-11-20 36 views
2

我正在從流中解析XML,並將POJO調度到ProcessContext.output。 它正在拋出ClosedChannelException。 任何想法發生了什麼?這DataFlow ProcessContext輸出異常ClosedChannelException

com.google.cloud.dataflow.sdk.util.UserCodeException: java.nio.channels.ClosedChannelException 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:193) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483) 
    at com.myproj.dataflow.MyDocDispatcher.onMyDoc(MyDocDispatcher.java:24) 
+0

請使用格式化代碼格式化異常 – KPrince36

+0

由於數據流的下一步出錯,這是否是異常?我看到元素正在傳遞到下一個數據流轉換步驟,並且正在成功處理。 –

+0

這個例外意味着你關閉了頻道,然後繼續使用它。 – EJP

回答

1

一個可能的原因是您DoFn,做的XML處理,以產生一個POJO居然懶洋洋地產生POJO。當您將該POJO傳遞到ProcessContext#output()時,可能會在優化程序的基礎上,將其直接傳遞到管道後面的其他DoFn

在這種情況下,如果與POJO相互作用下游DoFn具有被接收在POJO一些副作用,它違反了immutability requirements,由於與POJO交互接收到來自ProcessContext#element()修改它。

如果這是問題,最簡單的修復方法是在將其傳遞到output()之前克隆POJO。