2016-11-25 26 views
0

我有很多類的對象說測試我想寫給Kafka並使用Spark Stream App來處理它們。我想使用Kryo序列化。如何在Spark Streaming應用程序中從Kafka接收Java對象

我的應用程序是用Java

JavaDStream<Test> testData = KafkaUtils 
       .createDirectStream(context , keyClass,valueClass ,keyDecoderClass ,valueDecoderClass , props,topics); 

我的問題是我應該把爲keyClass,valueClass,keyDecoderClass,valueDecoderClass?

回答

1

如果您的主題是「String」並且其值爲「Test」,那麼您首先需要通過實施kafka.serializer.Encoderkafka.serializer.Decoder來創建TestEncoder和TestDecoder類。現在,在您的createDirectStream方法,你可以有

JavaPairInputDStream<String, Test> testData = KafkaUtils 
      .createDirectStream(context, String.class,Test.class ,StringDecoder.class,TestDecoder.class,props,topics); 

您可以在https://www.tomsdev.com/blog/2015/storm-kafka-complex-types/

參考KafkaKryoEncoder在你的卡夫卡生產者,你需要註冊您的自定義編碼器類像

Properties properties = new Properties(); 
properties.put("metadata.broker.list", brokerList); 
properties.put("serializer.class", "com.my.TestEncoder"); 
Producer<String, Test> producer = new Producer<String, Test>(new ProducerConfig(properties)); 
Test test = new Test(); 
KeyedMessage<String, Test> data = new KeyedMessage<String, Test>("myTopic", test); 
producer.send(data); 
+0

感謝這有助於。 – Bankelaal

相關問題