2017-06-16 101 views
2

我有一個與Spark JavaStreamingContext一起使用的程序。我已經瞭解到,使用DStreams時只有幾個輸出操作,如print()。 這是一段代碼在JavaSparkStreamingContext中執行查詢

private static void analyzeHashtags() throws InterruptedException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

} 

現在我想查詢操作添加到這個代碼,如下圖所示:

private static void analyzeHashtags() throws InterruptedException, SQLException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    String hashtag = "#dummy"; int frequencies = 59; 
    String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})"; 
    st.executeUpdate(cql); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

但這代碼只是執行查詢一次。我希望它在每次循環時執行它。 怎麼可能做到這一點?提前致謝。

回答

2

要對DStream執行任意操作,我們使用foreachRDD。它在每個批處理間隔提供對數據的訪問,由基礎rdd表示。

的Java/Scala的僞(混合)代碼:

JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new 
Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
lines.foreachRDD{ rdd => 
    .. do something with the RDD here... 
} 

通常,do something操作上的RDD的數據。 我們可以通過使用RDD函數(例如foreachPartition)以分佈式方式對該數據進行操作。

但是,考慮到您在本地使用本地neo4j連接,並且如果每個流式傳輸間隔的數據不是很大,我們可以將數據收集到驅動程序並在本地執行操作。看來,這將是在這種情況下一個合適的,因爲數據已經通過已經分佈式簡化階段(reduceBykey

因此,foreachRDD部分將變成:

lines.foreachRDD{ rdd => 
    val localDataCollection = rdd.collect 
    localDataCollection.foreach{ keywordFreqPair => 
     val cql = "CREATE (n:Hashtag {name:'"+keywordFreqPair._1+"', freq:"+keywordFreqPair._2+"})" 
     st.executeUpdate(cql) 
} 
+0

感謝完整和有用的答案。 我只是不知道如何在Java(idk Scala)中實現'foreachRDD'部分。使用lambda表達式,我應該寫'lines.foreachRDD(rdd - >(...'用函數代替點嗎? – sirdan

+1

我個人推薦使用Scala和Spark Streaming。對於'foreachRDD' lambda的Java轉換,我想你可以在Spark Streaming示例包中找到一個例子,例如:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg

+0

非常感謝,這有助於很多 – sirdan