閱讀習慣序列文件我已經在Hadoop中的自定義寫類,這是保存爲sequencefile如下在PySpark
public class ABC implements Writable{
private byte[] myId;
private byte[] myType;
//Constructor and other methods
@Override
public void write(DataOutput out) throws IOException {
myId.write(out);
myType.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
myId.readFields(in);
myType.readFields(in);
}
}
而且我想用PySpark閱讀sequencefile和獲取數據。我曾嘗試以下三種方式:
- 直接讀取:
sc.sequenceFile( 「文件:///Test.seq」,keyClass = 「ABC」,valueClass = 「ABC」)
但得到
object not serializable (class: ABC, value: [email protected])
- 書寫轉換器:
- 使用BytesWritable:
官方教程http://spark.apache.org/docs/latest/programming-guide.html#external-datasets,它說
如果有自定義序列化的二進制數據(如 卡桑德拉/ HBase的加載數據)之後,那麼你首先需要轉變對 斯卡拉數據/爪哇方面的東西,可以由Pyrolite的 pickler處理。爲此提供了轉換器特性。只需擴展此 特徵並在convert方法中實現您的轉換代碼即可。
因此我實現一個轉換器,如下所示:
import test.ABC
import java.io.DataInput
import org.apache.spark.api.python.Converter
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts data
* to ABC
*/
class DataToABCConverter extends Converter[Any, ABC] {
override def convert(obj: Any): ABC = {
if (obj == null) {
return null
}
val in = obj.asInstanceOf[DataInput]
val abc = new ABC()
abc.readFields(in)
abc
}
}
而在PySpark我使用下面的代碼
sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC", keyConverter="DataToABCConverter", valueConverter="DataToABCConverter")
但得到以下錯誤
java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput
似乎像轉換器的輸入是我的ABC類不是ja va.io.DataInput,這樣我就不能應用readFields方法來獲取數據。
我添加geID()
方法來獲得byets和更改轉換器,如下所示:
class DataToChunkConverter extends Converter[Any, BytesWritable] {
override def convert(obj: Any): BytesWritable = {
if (obj == null) {
return null
}
val abc = obj.asInstanceOf[ABC]
val idd = abc.getID()
new BytesWritable(idd)
}
}
比我運行pyspark使用
pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
但得到以下錯誤
Failed to pickle Java object as value: BytesWritable, falling back
to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable
所以我的問題是什麼是正確的方式來讀取PySpark中的自定義序列文件?什麼類型可以通過PySpark序列化?任何建議表示讚賞!
您已經標記了pyspark(這意味着Python的),Java和斯卡拉。所有這些標籤一次都沒有意義。 –