剛剛開始使用Spark-Java中的嬰兒步驟。以下是一個字數統計程序,其中包含一個可以跳過列表中的單詞的停用詞列表。我有2個累加器來計算跳過的單詞和未跳過的單詞。Spark Java Accumulator不會增加
但是,程序結束時的Sysout
始終將累加器值設置爲0。
請指出我要出錯的地方。
public static void main(String[] args) throws FileNotFoundException {
SparkConf conf = new SparkConf();
conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = jsc.textFile("hello.txt");
JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String aLine) throws Exception {
return Arrays.asList(aLine.split(" "));
}
});
String[] stopWordArray = getStopWordArray();
final Accumulator<Integer> skipAccumulator = jsc.accumulator(0);
final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0);
final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray);
JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() {
public Boolean call(String inString) throws Exception {
boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString);
if(!filterCondition){
System.out.println("Filtered a stop word ");
skipAccumulator.add(1);
}else{
unSkipAccumulator.add(1);
}
return filterCondition;
}
});
System.out.println("$$$$$$$$$$$$$$$Filtered Count "+skipAccumulator.value());
System.out.println("$$$$$$$$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value());
/* rest of code - works fine */
jsc.stop();
jsc.close();
}
我正在運行的JAR和使用
spark-submit jarname
------------編輯在Hortonworks沙盒2.4提交作業------- ---------
REST該雲在註釋部分
JavaPairRDD<String, Integer> wordOccurrence = filteredWords.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String inWord) throws Exception {
return new Tuple2<String, Integer>(inWord, 1);
}
});
JavaPairRDD<String, Integer> summed = wordOccurrence.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});
summed.saveAsTextFile("hello-out");
這兩個累加器都是0,並且由於停止詞有5個出現,文本過濾停止詞打印5次。 –