2016-11-14 15 views
1

我寫了一個輸出Set的DoFn<String, Set<SectionBodyRecord>>。 當我執行我的管道我得到的異常在DoFn中的com.google.cloud.dataflow.sdk.util.IllegalMutationException

Caused by: com.google.cloud.dataflow.sdk.util.IllegalMutationException: DoFn UnmarshalGcsPath mutated value 
    [SectionBodyRecord{txId='3UR93528NX413902J'}, 
    SectionBodyRecord{txId='15N97640P5806660M'}, 
    SectionBodyRecord{txId='7TG473112Y9407154'}, 
    SectionBodyRecord{txId='9A1906561E887050P'}, 
    SectionBodyRecord{txId='4FP63718R4365381L'}] 
    after it was output (new value was 
    [SectionBodyRecord{txId='9A1906561E887050P'}, 
    SectionBodyRecord{txId='7TG473112Y9407154'}, 
    SectionBodyRecord{txId='3UR93528NX413902J'}, 
    SectionBodyRecord{txId='4FP63718R4365381L'}, 
    SectionBodyRecord{txId='15N97640P5806660M'}]). 
    Values must not be mutated in any way after being output. 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.verifyOutputUnmodified(ParDo.java:1344) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1306) 
     at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
     at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450) 
     at my.tests.pipelines.CsvToDatastore$UnmarshalGcsPath.processElement(CsvToDatastore.java:185) 

我不明白問題出在哪裏可能。此外,我已嘗試與

ImmutableSet<SectionBodyRecord> irecs = ImmutableSet.copyOf(records); 
c.output(irecs); 

但問題依然存在。

有什麼建議嗎?

由於提前, 邁克爾

+0

如果沒有看到UnmarshalGcsPath的代碼很難說。你可以發佈嗎? – jkff

+0

正在使用什麼樣的'Set'表示?如何爲'Set'和'SectionBodyRecord'定義相等性?根據錯誤,看起來這些集合是相同的(儘管它們有不同),這仍然會導致集合被視爲相等。如果'SectionBodyRecord'具有相同的'txId'值,則它們被視爲相等。 –

回答

0

我猜你在後臺使用HashSet,而你沒有(重新)實現的hashCode()功能SectionBodyRecord - 至少這是我的問題。

您可以定義hashCode(),如int hashCode() { return txId.hashCode(); },或者使用TreeSet代替並實現Comparable<SectionBodyRecord>接口。

您出現錯誤的原因在於,集合中包含的數據每次創建集合時都會更改位置,因爲沒有確定性的方式來確定訂單(在TreeSet的情況下)或位置(在HashSet)您的SectionBodyRecord元素。