2016-04-22 22 views
0

全部。我有兩個關於Spark-streaming應用程序的問題。如何將JavaMapwithStateDstream的內容輸出到textFile?

第一個是如何輸出JavaMapwithStateDstream的內容轉換成文本文件,我通過API文檔去了,發現它的Dstreamlike interface.So我使用下面的代碼,試圖輸出內容:

Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = 
      new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() { 
       @Override 
       public Tuple2<String, Integer> call(String word, Optional<Integer> one, 
        State<Integer> state) { 
       int sum = one.or(0) + (state.exists() ? state.get() : 0); 
       Tuple2<String, Integer> output = new Tuple2<>(word, sum); 
       state.update(sum); 
       return output; 
       } 
      }; 

    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = 
      adCounts.mapWithState(StateSpec.function(mappingFunc)); 

    stateDstream.print(); 
    stateDstream.foreachRDD(new Function<JavaRDD<Tuple2<String,Integer>>, Void>() { 
     @Override 
     public Void call(JavaRDD<Tuple2<String, Integer>> rdd) throws Exception { 
      rdd.saveAsTextFile("/path/to/hdfs"); 
      return null; 
     } 
    }); 

然而,沒有任何輸出到HDFS path.But我可以從控制檯看到打印結果

enter image description here

請告訴我發生什麼事了?我怎麼能輸出JavaMapw的內容ithStateDstream?

第二個問題:

我想更新實時結果每次持續時間,即使沒有其他新的流入,我怎麼能實現它?

感謝。

回答

0

我發現JavaMapwithStateDstream可以打印出一些東西但不保存textFile的原因,因爲它在每個持續時間更新/初始化,流入的新數據將在下一次初始化時被覆蓋,因此沒有任何東西可以保存到文本文件。

解決方法是聲明一個新變量來複制stateDstream的值, 我在這裏使用Dstream,我認爲JavaPairDstream應該也可以。

DStream<Tuple2<String, Integer>> fin_Counts = stateDstream.dstream(); 
fin_Counts.print(); 

fin_Counts可以更新和保存。