2016-12-19 61 views
0

我需要從Kafka主題中讀取加密的消息。我當前的代碼從主題讀取字符串看起來是這樣的:從Kafka讀取二進制數據Spark作業中的主題

JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
        jssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topics), kafkaParams) 
       ); 

我應該做的這段代碼有什麼變化來讀取卡夫卡隊列字節數組來確保,加密數據不會被破壞的過程中類型轉換。 雖然我從星火編程指南上面的代碼,即時通訊無法找到這個API在KafkaUtils: http://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html

回答

0

你可以看到卡夫卡連接here的一個很好的例子。

但是你想要的是從你的卡夫卡主題採取ByteArray

這個連接你需要創建一個JavaPairInputDStream這樣的:

import org.apache.kafka.common.serialization.ByteArrayDeserializer 
JavaPairInputDStream<String, Array[Byte]> messages = KafkaUtils.createDirectStream(
    jssc, 
    String.class, 
    String.class, 
    StringDecoder.class, 
    ByteArrayDeserializer.class, 
    kafkaParams, 
    topicsSet 
); 
+0

感謝您的答覆。我會試試這個。但是,你能告訴我爲什麼我們需要完全使用不同的API,爲什麼我們需要創建一個JavaPairInputStream.Just,所以我明白我在做什麼。 –

+0

我無法用上面的代碼片段編譯代碼.Eclipse無法識別您列出的createDirectStream版本。它只能看到我用過的版本。我正在使用kafka010和Spark 2.0.1。我已經從Maven倉庫下載了spark kafka streaming jar。 –

+0

我想也許這可能是不同的。但你需要設置字節反序列化 –

相關問題