我正在使用Spark Kafka連接器從Kafka集羣中獲取數據。從它,我得到的數據爲JavaDStream<String>
。我如何獲得數據爲JavaDStream<EventLog>
,其中EventLog
是一個Java bean?如何獲取Spark Kafka Connector中對象的JavaDStream?
public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
jssc.start();
jssc.awaitTermination();
return lines;
}
我的目標是將這些數據保存到卡桑德拉,因爲具有相同規格EventLog
表。 Spark Cassandra連接器在插入語句中接受JavaRDD<EventLog>
,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();
。我想從Kafka得到這些JavaRDD<EventLog>
。
你的意思是你還希望有消息作爲一對字符串並將其轉換?或者你想使用JavaReceiverInputDStream?您希望在哪裏引入EventLog類型?你試過定義一個接受EventLog類型並從中構建JavaDStream的接收器嗎? –
Sunny
@孫尼我的目標是將數據寫入卡桑德拉。 Spark Cassandra連接器在插入語句中接受'JavaRDD',如下所示:'javaFunctions(rdd).writerBuilder(「ks」,「event」,mapToRow(EventLog.class))。saveToCassandra();'。我想從Kafka那裏得到這些'JavaRDD '。 –
khateeb
你也可以訪問將這些EventLogs寫入kafka的代碼嗎?是否實現了自定義序列化器,並將事件日誌序列化並作爲事件日誌寫入Kafka? – Sunny