2017-06-22 62 views
0

我剛剛在spark中發現了一個類的序列化錯誤。如何單元測試一個類是可序列化的火花?

=>現在,我想做一個單元測試,但我不知道如何?

注:

  • 故障在已經廣播的(解)序列化對象追加。
  • 我要考什麼火花會做,斷言它會工作一旦部署
  • 類序列化是一個標準類(不區分類別)延伸串行

回答

0

展望火花廣播碼, 我找到了一個方法。但它使用專用的火花代碼,因此如果火花內部發生更改,它可能會失效。但它仍然有效。

在封裝起始添加一個測試類由org.apache.spark,如:

package org.apache.spark.my_company_tests 

// [imports] 

/** 
* test data that need to be broadcast in spark (using kryo) 
*/ 
class BroadcastSerializationTests extends FlatSpec with Matchers { 

    it should "serialize a transient val, which should be lazy" in { 

    val data = new MyClass(42) // data to test 
    val conf = new SparkConf() 


    // Serialization 
    // code found in TorrentBroadcast.(un)blockifyObject that is used by TorrentBroadcastFactory 
    val blockSize = 4 * 1024 * 1024 // 4Mb 
    val out = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate) 
    val ser = new KryoSerializer(conf).newInstance() // Here I test using KryoSerializer, you can use JavaSerializer too 
    val serOut = ser.serializeStream(out) 

    Utils.tryWithSafeFinally { serOut.writeObject(data) } { serOut.close() } 

    // Deserialization 
    val blocks = out.toChunkedByteBuffer.getChunks() 
    val in = new SequenceInputStream(blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration) 
    val serIn = ser.deserializeStream(in) 

    val data2 = Utils.tryWithSafeFinally { serIn.readObject[MyClass]() } { serIn.close() } 

    // run test on data2 
    data2.yo shouldBe data.yo 
    } 
} 

class MyClass(i: Int) extends Serializable { 
    @transient val yo = 1 to i // add lazy to make the test pass: not lazy transient val are not recomputed after deserialization 
}