2016-08-11 22 views
0

我正在使用Spark Kafka連接器從Kafka集羣中獲取數據。從它,我得到的數據爲JavaDStream<String>。我如何獲得數據爲JavaDStream<EventLog>,其中EventLog是一個Java bean?如何獲取Spark Kafka Connector中對象的JavaDStream?

public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) { 
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, zkQuorum, group, topicMap); 
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
     @Override 
     public String call(Tuple2<String, String> tuple2) { 
      return tuple2._2(); 
     } 
    }); 
    jssc.start(); 
    jssc.awaitTermination(); 
    return lines; 
} 

我的目標是將這些數據保存到卡桑德拉,因爲具有相同規格EventLog表。 Spark Cassandra連接器在插入語句中接受JavaRDD<EventLog>,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();。我想從Kafka得到這些JavaRDD<EventLog>

+0

你的意思是你還希望有消息作爲一對字符串並將其轉換?或者你想使用JavaReceiverInputDStream ?您希望在哪裏引入EventLog類型?你試過定義一個接受EventLog類型並從中構建JavaDStream的接收器嗎? – Sunny

+0

@孫尼我的目標是將數據寫入卡桑德拉。 Spark Cassandra連接器在插入語句中接受'JavaRDD ',如下所示:'javaFunctions(rdd).writerBuilder(「ks」,「event」,mapToRow(EventLog.class))。saveToCassandra();'。我想從Kafka那裏得到這些'JavaRDD '。 – khateeb

+0

你也可以訪問將這些EventLogs寫入kafka的代碼嗎?是否實現了自定義序列化器,並將事件日誌序列化並作爲事件日誌寫入Kafka? – Sunny

回答

0

使用超載的createStream方法,您可以傳遞鍵/值類型和解碼器類。

例子:

createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class, 
     kafkaParams, topicsMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

上面應該給你JavaPairDStream<String, EventLog>

JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() { 
    @Override 
    public EventLog call(Tuple2<String, EventLog> tuple2) { 
    return tuple2._2(); 
    } 
}); 

的EventLogDecoder應實現kafka.serializer.Decoder。下面是json解碼器的例子。

public class EventLogDecoder implements Decoder<EventLog> { 

public EventLogDecoder(VerifiableProperties verifiableProperties) { 
} 

@Override 
public EventLog fromBytes(byte[] bytes) { 
    ObjectMapper objectMapper = new ObjectMapper(); 
    try { 
    return objectMapper.readValue(bytes, EventLog.class); 
    } catch (IOException e) { 
    //do something 
    } 
    return null; 
} 
} 
+0

你能告訴我'StringDecoder'的整個包嗎? EventLogDecoder包含哪些函數? – khateeb

+0

'Function'中的第三個參數應該是'EventLog',就像'Function ,EventLog>()'。 – khateeb

+0

是更新的第三個參數。而StringDecoder的包是kafka.serializer.StringDecoder。我已經更新了包含示例解碼器的答案。 –

相關問題