HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
交叉檢查主題「test4」中是否存在有效數據。使用Spark Streaming後無輸出
我期待的是從卡夫卡集羣流串,在控制檯的console.No例外印刷,也沒有輸出。 我在這裏失蹤的任何東西?
你是否在StreamingContext上調用了'.start()'和'.awaitTermination()'? https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java#L109 – ccheneson
Yes.Had實際上錯過了它。現在調用它。但仍然沒有輸出數據。 –