我正在嘗試將兩個不同窗口的流聚合並將其打印到控制檯中。但是,只有第一個流式查詢正在打印。 tenSecsQ
未打印到控制檯中。在火花結構化流式傳輸中執行單獨的流式查詢
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");
// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");
// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");
針對5s和10s聚合流的觸發流查詢。 10s數據流的輸出不會打印。只有5s印在控制檯上
// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
tenSecsQ.awaitTermination();
實際上,我不知道套接字流是如何工作的,但對我來說似乎你的第一個Spark流從套接字流中讀取所有數據,而第二個沒有任何數據。 –