2017-02-20 105 views
0

當我申請ParDo.of(new ParDoFn())PCollection名爲textInput時,程序會拋出此異常。但是當我刪除.apply(ParDo.of(new ParDoFn()))時,程序正常。AssertionError:聲明失敗:copyAndReset必須返回一個零值副本

// SparkRunner

private static void testHadoop(Pipeline pipeline){ 
    Class<? extends FileInputFormat<LongWritable, Text>> inputFormatClass = 
      (Class<? extends FileInputFormat<LongWritable, Text>>) 
        (Class<?>) TextInputFormat.class; 
    @SuppressWarnings("unchecked") //hdfs://localhost:9000 
      HadoopIO.Read.Bound<LongWritable, Text> readPTransfom_1 = HadoopIO.Read.from("hdfs://localhost:9000/tmp/kinglear.txt", 
      inputFormatClass, 
      LongWritable.class, 
      Text.class); 
    PCollection<KV<LongWritable, Text>> textInput = pipeline.apply(readPTransfom_1) 
      .setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(Text.class))); 

    //OutputFormat 
    @SuppressWarnings("unchecked") 
    Class<? extends FileOutputFormat<LongWritable, Text>> outputFormatClass = 
      (Class<? extends FileOutputFormat<LongWritable, Text>>) 
        (Class<?>) TemplatedTextOutputFormat.class; 

    @SuppressWarnings("unchecked") 
    HadoopIO.Write.Bound<LongWritable, Text> writePTransform = HadoopIO.Write.to("hdfs://localhost:9000/tmp/output", outputFormatClass, LongWritable.class, Text.class); 

    textInput.apply(ParDo.of(new ParDoFn())).apply(writePTransform.withoutSharding()); 

    pipeline.run().waitUntilFinish(); 

} 
+0

你能否在你的問題中包含完整的異常堆棧跟蹤?這有助於縮小問題的範圍。另外,您可能想嘗試遵循Apache Beam示例中的樣式 - 您構建的變換僅使用一次;你可能想要內聯他們,你的代碼將更具可讀性。 –

回答

3

哪個版本的Spark你跑在最前面?根據我的經驗,你得到的錯誤是由Spark 2.x AccumulatorV2引發的,Spark runner目前支持Spark 1.6。

+0

你是對的! – zifanpan

+0

我已經解決了Spark 1.6的問題。 – zifanpan

+0

@zifanpan你能解釋一下你是如何解決這個問題的。我有你所建議的依賴版本,即1.6.3,我無法解決這個問題。請建議 – Abhishek

1

我在創建自定義累加器時遇到了類似的問題,該累加器延伸至org.apache.spark.util.AccumulatorV2。原因是override def isZero: Boolean方法中的邏輯不正確。所以基本上當你copyAndReset方法默認調用時,它調用copy()然後reset()你的isZero()應該返回true。 如果你看一下AccumulatorV2來源,是其中一個檢查:

// Called by Java when serializing an object 
final protected def writeReplace(): Any = { 
if (atDriverSide) { 
    if (!isRegistered) { 
    throw new UnsupportedOperationException(
     "Accumulator must be registered before send to executor") 
    } 
    val copyAcc = copyAndReset() 
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 
    copyAcc.metadata = metadata 
    copyAcc 
} else { 
    this 
} 
} 

明確這部分

val copyAcc = copyAndReset() 
assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 

希望它能幫助。快樂的火花!