0
全部。我有兩個關於Spark-streaming應用程序的問題。如何將JavaMapwithStateDstream的內容輸出到textFile?
第一個是如何輸出JavaMapwithStateDstream的內容轉換成文本文件,我通過API文檔去了,發現它的Dstreamlike interface.So我使用下面的代碼,試圖輸出內容:
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
adCounts.mapWithState(StateSpec.function(mappingFunc));
stateDstream.print();
stateDstream.foreachRDD(new Function<JavaRDD<Tuple2<String,Integer>>, Void>() {
@Override
public Void call(JavaRDD<Tuple2<String, Integer>> rdd) throws Exception {
rdd.saveAsTextFile("/path/to/hdfs");
return null;
}
});
然而,沒有任何輸出到HDFS path.But我可以從控制檯看到打印結果
請告訴我發生什麼事了?我怎麼能輸出JavaMapw的內容ithStateDstream?
第二個問題:
我想更新實時結果每次持續時間,即使沒有其他新的流入,我怎麼能實現它?
感謝。