1
當我使用spark處理數據時,java.io.NotSerializableException會給我帶來很多麻煩。使用Spark Streaming將rdd保存到Hbase時的java.io.NotSerializableException
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
mydata.foreachRDD(rdd => {
val json_rdd = rdd.map(Json.parse _).map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _)
json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration)
})
但是它沒有java.io.NotSerializableException的原因和所遵循的是錯誤信息
17/10/16 18:56:50 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
所以我改變我的代碼如下
object mytest_config{
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181")
hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2")
val newAPIJobConfiguration = Job.getInstance(hbase_conf);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table");
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp")
}
mydata.foreachRDD(rdd => {
val json_rdd = rdd.map(Json.parse _)
.map(_.validate[Scan])
.map(Scan.transformScanRestult _)
.filter(_.nonEmpty)
.map(_.get)
.map(Scan.convertForHbase _)
json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration)
})
而且這可能是工作! 有人有任何想法爲什麼這項工作,以及正式推薦的方式是什麼?
也許我沒有描述我的問題很清楚。我想知道爲什麼把所有的配置放在一個scala對象中,例如在這種情況下mytest-config可以工作。對象是否在驅動程序中啓動? –
'mytest_config'是'serializable',而另一個不是 – mrsrinivas
我看到你的意思是,對象可以將無序列化的對象從驅動程序轉換爲工作。萬分感謝。還有一個問題是在什麼時候應該使用懶惰的val。你能舉個例子嗎? –