2017-07-31 39 views
0

我使用Akka Kafka(Scala)並希望發送自定義對象。Akka Kafka自定義串行器

class TweetsSerializer extends Serializer[Seq[MyCustomType]] { 

override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ??? 

override def serialize(topic: String, data: Seq[MyCustomType]): Array[Byte] = ??? 

override def close(): Unit = ??? 

} 

我該如何正確編寫我自己的序列化程序?而且,我應該如何處理字段config

+0

只是谷歌的例子嗎? –

+0

@BranislavLazic我無法找到任何有用的例子在谷歌:( – Evgeniy

回答

0

我會使用StringSerializer,我的意思是,在生成它們之前,我會將所有類型轉換爲字符串。然而,這工作:

case class MyCustomType(a: Int) 

    class TweetsSerializer extends Serializer[Seq[MyCustomType]] { 

    private var encoding = "UTF8" 

    override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = { 
     val propertyName = if (isKey) "key.serializer.encoding" 
     else "value.serializer.encoding" 
     var encodingValue = configs.get(propertyName) 
     if (encodingValue == null) encodingValue = configs.get("serializer.encoding") 
     if (encodingValue != null && encodingValue.isInstanceOf[String]) encoding = encodingValue.asInstanceOf[String] 
    } 

    override def serialize(topic: String, data: Seq[MyCustomType]): Array[Byte] = 
     try 
      if (data == null) return null 
      else return { 
      data.map { v => 
       v.a.toString 
      } 
      .mkString("").getBytes("UTF8") 
      } 
     catch { 
     case e: UnsupportedEncodingException => 
      throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding) 
     } 

    override def close(): Unit = Unit 

    } 

} 

object testCustomKafkaSerializer extends App { 


    implicit val producerConfig = { 
    val props = new Properties() 
    props.setProperty("bootstrap.servers", "localhost:9092") 
    props.setProperty("key.serializer", classOf[StringSerializer].getName) 
    props.setProperty("value.serializer", classOf[TweetsSerializer].getName) 
    props 
    } 

    lazy val kafkaProducer = new KafkaProducer[String, Seq[MyCustomType]](producerConfig) 

    // Create scala future from Java 
    private def publishToKafka(id: String, data: Seq[MyCustomType]) = { 
     kafkaProducer 
     .send(new ProducerRecord("outTopic", id, data)) 
     .get() 
    } 

    val input = MyCustomType(1) 

    publishToKafka("customSerializerTopic", Seq(input)) 


}