0
我正試圖在我的本地機器上運行一個簡單的Spark-Streaming示例。
我有這樣寫道/ BS/CS到插座的線:來自套接字的Spark Streaming不能用於減少操作
serverSocket = new ServerSocket(Constants.PORT);
s1 = serverSocket.accept();
while(true) {
Thread.sleep(random.nextInt(100));
String character = alphabet.get(random.nextInt(alphabet.size())) ;
PrintWriter out = new PrintWriter(s1.getOutputStream());
out.println(character);
out.flush();
}
我的主程序,在這裏我試着算子的編號/ BS/CS如下所示(不減少步驟) :
public static void main(String[] args) {
// start socket writer thread
System.setProperty("spark.cleaner.ttl", "10000");
JavaSparkContext sc = new JavaSparkContext(
"local",
"Test",
Constants.SPARK_HOME,
new String[]{"target/spark-standalone-0.0.1-SNAPSHOT.jar"});
Duration batchDuration = new Duration(TIME_WINDOW_MS);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, batchDuration);
JavaDStream<String> stream = streamingContext.socketTextStream("localhost", Constants.PORT);
stream.print();
JavaPairDStream<String, Long> texts = stream.map(new PairFunction<String, String, Long>() {
@Override
public Tuple2<String, Long> call(String t) throws Exception {
return new Tuple2<String, Long>("batchCount" + t, 1l);
}
});
texts.print();
streamingContext.checkpoint("checkPointDir");
streamingContext.start();
在這種情況下,一切工作得很好(樣本輸出批次):
Time: 1372413296000 ms
-------------------------------------------
B
A
B
C
C
C
A
B
C
C
...
-------------------------------------------
Time: 1372413296000 ms
-------------------------------------------
(batchCountB,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
(batchCountC,1)
(batchCountA,1)
(batchCountB,1)
(batchCountC,1)
(batchCountC,1)
...
但是,如果我的地圖後添加還原步驟不起作用anymo回覆。 texts.print此代碼後進入()
JavaPairDStream<String, Long> reduced = texts.reduceByKeyAndWindow(new Function2<Long, Long, Long>() {
@Override
public Long call(Long t1, Long t2) throws Exception {
return t1 + t2;
}
}, new Duration(TIME_WINDOW_MS));
reduced.print();
在這種情況下,我只得到了第一個「流」變量和「文本」變量並沒有爲降低輸出。在第一批處理後也沒有任何反應。我還將火花日誌級別設置爲DEBUG,但沒有遇到任何異常或其他奇怪的事情。
這裏會發生什麼?爲什麼我被鎖定?