2017-03-03 123 views
2

這對我來說是一個有趣的練習,可以和卡夫卡和斯卡拉一起玩。我的目標是創建一個簡單的消息類型來發送kafka話題。這裏是我嘗試使用類型參數[A]的通用/可重用序列化程序。如何創建tell Jackson ObjectMapper來創建泛型類型?

import java.util.{Map => jMap} 
import scala.reflect.runtime.universe._  
import org.apache.kafka.common.serialization.{Deserializer, Serializer} 
import com.fasterxml.jackson.databind.ObjectMapper 
import com.fasterxml.jackson.databind.DeserializationFeature._  
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper 
import com.fasterxml.jackson.module.scala.DefaultScalaModule  

class MySerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] { 

    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 
    mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false) 

    override def close() = {/*Do Nothing*/} 
    override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/} 
    override def serialize(topic: String, subject: A): Array[Byte] = 
    mapper.writeValueAsBytes(subject) 
    override def deserialize(topic: String, bytes: Array[Byte]): A = { 
    val a: A = mapper.readValue(bytes, A.getClass()) /******PROBLEM****/ 
    return a 
    } 
} 

我在反序列化中遇到的錯誤是ObjectMapper.readValue的第二個參數。我給了它什麼,它會返回給我一個泛型類型?

我SBT:

name := "scalafunplay" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "org.apache.kafka" % "kafka_2.10" % "0.10.2.0", 
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7", 
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.8.7" 
) 

這是我主要的應用程序:

package scalafunplay 

object Mistkafer { 

    def main(args: Array[String]): Unit = { 


    case class Asset (ruid: String) 

    val test = new Asset("Dan The Man") 

    val serializer = new MySerializer[Asset]() 

    val sampleSerialized = serializer.serialize("test", test) 
    val sampleUnserialized = serializer.deserialize("test", test) 

    println("###### RESULT: " + sampleUnserialized) 


    } 

} 

回答

0

我決定不使用傑克遜畢竟。使用Java.IO字節數組和對象輸入/輸出流如下所示更容易:

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} 
import java.util.{Map => jMap} 
import scala.reflect.runtime.universe.TypeTag 
import org.apache.kafka.common.serialization.{Deserializer, Serializer} 

class ObjectSerializer[A : TypeTag]() extends Serializer[A] with Deserializer[A] { 

    override def close() = {/*Do Nothing*/} 

    override def configure(configs: jMap[String, _], isKey: Boolean) = {/*Do Nothing*/} 

    override def serialize(topic: String, subject: A): Array[Byte] = { 
    val byteArrInStr = new ByteArrayOutputStream() 
    val objInpStr = new ObjectOutputStream(byteArrInStr) 
    objInpStr.writeObject(subject) 
    byteArrInStr.toByteArray() 
    } 

    override def deserialize(topic: String, bytes: Array[Byte]): A = { 
    val byteArrInStr = new ByteArrayInputStream(bytes) 
    val objInStr = new ObjectInputStream(byteArrInStr) 
    objInStr.readObject().asInstanceOf[A] 
    } 
}