2014-09-25 19 views
2

我正在處理一個Java jar。累加器累加流值。問題是,我想在每次增加或在特定的時間間隔內在UI中顯示值。如何顯示DStream中更新的當前累加器值?

但是,由於累加器值只能從驅動程序中獲取,所以我無法訪問該值直到該過程完成其執行。關於如何定期訪問此值的任何想法?

我的代碼如下

package com.spark; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.spark.Accumulator; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import scala.Tuple2; 

public class KafkaSpark { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setAppName("Simple Application"); 
     conf.setMaster("local"); 
     JavaStreamingContext jssc = new JavaStreamingContext(conf, 
       new Duration(5000)); 
     final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0); 
     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     topicMap.put("test", 1); 
     JavaPairDStream<String, String> lines = KafkaUtils.createStream(jssc, 
       "localhost:2181", "group1", topicMap); 

     JavaDStream<Integer> map = lines 
       .map(new Function<Tuple2<String, String>, Integer>() { 

        public Integer call(Tuple2<String, String> v1) 
          throws Exception { 
         if (v1._2.contains("the")) { 
          accum.add(1); 
          return 1; 
         } 
         return 0; 
        } 
       }); 

     map.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
     System.out.println("*************" + accum.value()); 
     System.out.println("done"); 
    } 
} 

我使用卡夫卡流數據給出。

+0

爲什麼您認爲累加器非常適合用例?我寧願考慮一些外部數據存儲以反映此聚合的當前價值,即另一個Kafka話題,或許客戶用它來通知有關更改。 – 2016-04-22 06:54:00

回答

1

只有在調用jssc.star()時纔會觸發實際代碼開始執行。現在,控制器開始運行循環,所有system.out.println將只被調用一次。並且不會每次都用循環執行。

對於出把操作檢查documentation

您可以使用

的print() forEachRDD() 保存爲對象文本或Hadoop的文件

希望這有助於

0
jssc.start(); 
while(true) { 
    System.out.println("current:" + accum.value()); 
    Thread.sleep(1000); 
}