接收我是很新的星火流和我實現小型運動如從卡夫卡發送XML數據,需要接收流數據通過火花流。我嘗試了所有可能的方式..但每次我得到空值。獲取空值,而從卡夫卡星火流
有一個在卡夫卡側沒有問題的,唯一的問題是從火花側接收流數據。
這裏是我如何實現代碼:
package com.package;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStringConsumer {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("mytopic");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
}
,我使用以下版本:
**動物園管理員3.4.6
斯卡拉2.11
星火2.0
卡夫卡0.8.2 **
您可以通過使用012ab獲取值@abaghel,感謝您的快速響應。我試着按照你的說法,仍然得到空的消息...這裏是消息: 收到消息[] ---新的RDD與1分區和0記錄 – user6325753
你試過kafka控制檯消費者?你能看到那裏的消息嗎? – abaghel
是在卡夫卡控制檯消費者嘗試....我可以看到那裏的消息... – user6325753