2017-03-09 81 views
0

閱讀習慣序列文件我已經在Hadoop中的自定義寫類,這是保存爲sequencefile如下在PySpark

public class ABC implements Writable{ 
    private byte[] myId; 
    private byte[] myType; 

    //Constructor and other methods 
    @Override 
    public void write(DataOutput out) throws IOException { 
     myId.write(out); 
     myType.write(out); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     myId.readFields(in); 
     myType.readFields(in); 
    } 
} 

而且我想用PySpark閱讀sequencefile和獲取數據。我曾嘗試以下三種方式:

  1. 直接讀取:

sc.sequenceFile( 「文件:///Test.seq」,keyClass = 「ABC」,valueClass = 「ABC」)

但得到

object not serializable (class: ABC, value: [email protected]) 
  • 書寫轉換器:
  • 官方教程http://spark.apache.org/docs/latest/programming-guide.html#external-datasets,它說

    如果有自定義序列化的二進制數據(如 卡桑德拉/ HBase的加載數據)之後,那麼你首先需要轉變對 斯卡拉數據/爪哇方面的東西,可以由Pyrolite的 pickler處理。爲此提供了轉換器特性。只需擴展此 特徵並在convert方法中實現您的轉換代碼即可。

    因此我實現一個轉換器,如下所示:

    import test.ABC 
    import java.io.DataInput 
    import org.apache.spark.api.python.Converter 
    
    /** 
    * Implementation of [[org.apache.spark.api.python.Converter]] that converts data 
    * to ABC 
    */ 
    class DataToABCConverter extends Converter[Any, ABC] { 
        override def convert(obj: Any): ABC = { 
        if (obj == null) { 
         return null 
        } 
        val in = obj.asInstanceOf[DataInput] 
        val abc = new ABC() 
        abc.readFields(in) 
        abc 
        } 
    } 
    

    而在PySpark我使用下面的代碼

    sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC", keyConverter="DataToABCConverter", valueConverter="DataToABCConverter") 
    

    但得到以下錯誤

    java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput 
    

    似乎像轉換器的輸入是我的ABC類不是ja va.io.DataInput,這樣我就不能應用readFields方法來獲取數據。

  • 使用BytesWritable:
  • 我添加geID()方法來獲得byets和更改轉換器,如下所示:

    class DataToChunkConverter extends Converter[Any, BytesWritable] { 
        override def convert(obj: Any): BytesWritable = { 
        if (obj == null) { 
         return null 
        } 
        val abc = obj.asInstanceOf[ABC] 
        val idd = abc.getID() 
        new BytesWritable(idd) 
        } 
    } 
    

    比我運行pyspark使用

    pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" 
    

    但得到以下錯誤

    Failed to pickle Java object as value: BytesWritable, falling back 
    to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable 
    

    所以我的問題是什麼是正確的方式來讀取PySpark中的自定義序列文件?什麼類型可以通過PySpark序列化?任何建議表示讚賞!

    +0

    您已經標記了pyspark(這意味着Python的),Java和斯卡拉。所有這些標籤一次都沒有意義。 –

    回答

    0

    經過一些實驗(遵循第三種方法)後,事實證明,如果scala或Java中的本機類型被用作轉換器的返回類型,它就可以工作。

    例如,使用Array[Byte]作爲返回類型,該Pyspark可以順利拿到數據:

    class DataToChunkConverter extends Converter[Any, Array[Byte]] { 
        override def convert(obj: Any): Array[Byte] = { 
        if (obj == null) { 
         return null 
        } 
        val abc = obj.asInstanceOf[ABC] 
        val idd = abc.getID() 
        idd 
        } 
    }