2
我有一個與Spark JavaStreamingContext
一起使用的程序。我已經瞭解到,使用DStreams時只有幾個輸出操作,如print()
。 這是一段代碼在JavaSparkStreamingContext中執行查詢
private static void analyzeHashtags() throws InterruptedException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
jssc.start();
jssc.awaitTermination();
}
現在我想查詢操作添加到這個代碼,如下圖所示:
private static void analyzeHashtags() throws InterruptedException, SQLException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
String hashtag = "#dummy"; int frequencies = 59;
String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})";
st.executeUpdate(cql);
jssc.start();
jssc.awaitTermination();
}
但這代碼只是執行查詢一次。我希望它在每次循環時執行它。 怎麼可能做到這一點?提前致謝。
感謝完整和有用的答案。 我只是不知道如何在Java(idk Scala)中實現'foreachRDD'部分。使用lambda表達式,我應該寫'lines.foreachRDD(rdd - >(...'用函數代替點嗎? – sirdan
我個人推薦使用Scala和Spark Streaming。對於'foreachRDD' lambda的Java轉換,我想你可以在Spark Streaming示例包中找到一個例子,例如:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg
非常感謝,這有助於很多 – sirdan