2016-11-28 69 views
1

以下是在嘗試將作業分派給執行程序時導致java.io.NotSerializableException的代碼。在映射JavaRDD時獲取java.io.NotSerializableException

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() { 

     /** 
     * Serial Version Id 
     */ 
     private static final long serialVersionUID = 6766320395808127072L; 

     @Override 
     public String call(Row row) throws Exception { 
      return row.mkString(dataFormat.getDelimiter()); 
     } 
    }); 

然而,當我這樣做,任務成功連載:

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
List<String> dataList = rddToWrite.collect().stream().parallel() 
          .map(row -> row.mkString(dataFormat.getDelimiter())) 
          .collect(Collectors.<String>toList()); 
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext()); 
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList); 

任何人都可以請幫我指出我在做什麼錯在這裏?

編輯: dataFormat是包含此代碼的函數寫入的類中的私有成員字段。它是DataFormat類的一個對象,它定義了兩個字段,即spark數據格式(例如「com.databricks.spark.csv」)和分隔符(例如「\ t」)。

+0

什麼'dataFormat'? –

+0

'dataFormat'是一個局部變量還是包含類的字段? –

回答

3

通過new Function ...創建的匿名類需要包封實例的引用,和序列化的功能需要串行化外圍實例,包括dataFormat所有其他字段。如果該類未標記爲Serializable,或者具有任何不可序列化的非transient字段,則該字段不起作用。即使它確實如此,它卻默默無聞地表現得更糟。

不幸的是,要完全解決這個,你需要創建一個名爲靜態內部類(或只是一個單獨的類),它甚至可以不是本地的(因爲無論匿名還是local classes in Java可以是靜態的):

static class MyFunction extends Function<Row, String> { 
    private String delimiter; 
    private static final long serialVersionUID = 6766320395808127072L; 

    MyFunction(String delimiter) { 
     this.delimiter = delimiter; 
    } 

    @Override 
    public String call(Row row) throws Exception { 
     return row.mkString(delimiter); 
    } 
} 

然後

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter())); 
+0

謝謝!這解決了它:) – ologn13

3

當您訪問dataFormat,這意味着this.dataFormat。 所以火花會嘗試序列化this並遇到NotSerializableException

設法讓像一個本地副本:

DataFormat dataformat = this.dataformat; 
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ... 

欲瞭解更多信息,請參閱 http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

+1

至少在一個快速測試中,即使是一個匿名類,它也不會訪問封裝實例的任何方法或字段,但仍然會引用它,所以會嘗試序列化它。也許我做錯了,雖然... –

+0

Alexey是對的!這仍然是序列化封閉的實例。造成同樣的問題。 – ologn13

+0

是的,他是對的。我對java中的匿名函數有一些誤解。 –