我正在處理一個Java jar。累加器累加流值。問題是,我想在每次增加或在特定的時間間隔內在UI中顯示值。如何顯示DStream中更新的當前累加器值?
但是,由於累加器值只能從驅動程序中獲取,所以我無法訪問該值直到該過程完成其執行。關於如何定期訪問此值的任何想法?
我的代碼如下
package com.spark;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaSpark {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
new Duration(5000));
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("test", 1);
JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc,
"localhost:2181", "group1", topicMap);
JavaDStream<Integer> map = lines
.map(new Function<Tuple2<String, String>, Integer>() {
public Integer call(Tuple2<String, String> v1)
throws Exception {
if (v1._2.contains("the")) {
accum.add(1);
return 1;
}
return 0;
}
});
map.print();
jssc.start();
jssc.awaitTermination();
System.out.println("*************" + accum.value());
System.out.println("done");
}
}
我使用卡夫卡流數據給出。
爲什麼您認爲累加器非常適合用例?我寧願考慮一些外部數據存儲以反映此聚合的當前價值,即另一個Kafka話題,或許客戶用它來通知有關更改。 – 2016-04-22 06:54:00