2016-06-01 73 views
1

剛剛開始使用Spark-Java中的嬰兒步驟。以下是一個字數統計程序,其中包含一個可以跳過列表中的單詞的停用詞列表。我有2個累加器來計算跳過的單詞和未跳過的單詞。Spark Java Accumulator不會增加

但是,程序結束時的Sysout始終將累加器值設置爲0

請指出我要出錯的地方。

public static void main(String[] args) throws FileNotFoundException { 

     SparkConf conf = new SparkConf(); 
     conf.setAppName("Third App - Word Count WITH BroadCast and Accumulator"); 
     JavaSparkContext jsc = new JavaSparkContext(conf); 
     JavaRDD<String> fileRDD = jsc.textFile("hello.txt"); 
     JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() { 

      public Iterable<String> call(String aLine) throws Exception { 
       return Arrays.asList(aLine.split(" ")); 
      } 
     }); 

     String[] stopWordArray = getStopWordArray(); 

     final Accumulator<Integer> skipAccumulator = jsc.accumulator(0); 
     final Accumulator<Integer> unSkipAccumulator = jsc.accumulator(0); 

     final Broadcast<String[]> stopWordBroadCast = jsc.broadcast(stopWordArray); 

     JavaRDD<String> filteredWords = words.filter(new Function<String, Boolean>() { 

      public Boolean call(String inString) throws Exception { 
       boolean filterCondition = !Arrays.asList(stopWordBroadCast.getValue()).contains(inString); 
       if(!filterCondition){ 
        System.out.println("Filtered a stop word "); 
        skipAccumulator.add(1); 
       }else{ 
        unSkipAccumulator.add(1); 
       } 
       return filterCondition; 

      } 
     }); 

     System.out.println("$$$$$$$$$$$$$$$Filtered Count "+skipAccumulator.value()); 
     System.out.println("$$$$$$$$$$$$$$$ UN Filtered Count "+unSkipAccumulator.value()); 

     /* rest of code - works fine */ 
     jsc.stop(); 
     jsc.close(); 
     } 

我正在運行的JAR和使用

spark-submit jarname 

------------編輯在Hortonworks沙盒2.4提交作業------- ---------

REST該雲在註釋部分

JavaPairRDD<String, Integer> wordOccurrence = filteredWords.mapToPair(new PairFunction<String, String, Integer>() { 

      public Tuple2<String, Integer> call(String inWord) throws Exception { 
       return new Tuple2<String, Integer>(inWord, 1); 
      } 
     }); 

     JavaPairRDD<String, Integer> summed = wordOccurrence.reduceByKey(new Function2<Integer, Integer, Integer>() { 

      public Integer call(Integer a, Integer b) throws Exception { 
       return a+b; 
      } 
     }); 

     summed.saveAsTextFile("hello-out"); 
+1

這兩個累加器都是0,並且由於停止詞有5個出現,文本過濾停止詞打印5次。 –

回答

1

優米的代碼的發佈了重要部分/* rest of code - works fine */。我可以保證你在其他代碼中調用一些操作。這會觸發DAG使用累加器執行代碼。嘗試在println之前添加filteredWords.collect,您應該看到輸出。請記住,Spark在轉換上很懶,只能在動作上執行。

+0

編輯了這個問題。 :) –

+1

正確答案 - Spark是懶惰的轉換,並只執行動作 我強制執行第一個()讓它工作。 –