2017-08-10 62 views
6

我正在嘗試將兩個不同窗口的流聚合並將其打印到控制檯中。但是,只有第一個流式查詢正在打印。 tenSecsQ未打印到控制檯中。在火花結構化流式傳輸中執行單獨的流式查詢

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

針對5s和10s聚合流的觸發流查詢。 10s數據流的輸出不會打印。只有5s印在控制檯上

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

實際上,我不知道套接字流是如何工作的,但對我來說似乎你的第一個Spark流從套接字流中讀取所有數據,而第二個沒有任何數據。 –

回答

5

我一直在調查這個問題。

摘要:結構化數據流中的每個查詢消耗source數據。套接字源爲每個定義的查詢創建一個新的連接。在這種情況下看到的行爲是因爲nc僅將輸入數據傳遞到第一個連接。

因此,除非我們可以確保連接的套接字源向每個打開的連接傳遞相同的數據,否則無法通過套接字連接定義多個聚合。


我在Spark郵件列表中討論了這個問題。 Databricks開發人員朱世雄回答:

Spark爲每個查詢創建一個連接。你觀察到的行爲是因爲「nc -lk」是如何工作的。如果您使用netstat來檢查tcp連接,則會在啓動兩個查詢時看到兩個連接。但是,「nc」僅將輸入轉發給一個連接。

我定義了一個小實驗驗證了這種行爲: 首先,我創建了一個SimpleTCPWordServer,提供隨機的話給每個連接開放和聲明兩個查詢一個基本的結構化數據流的工作。它們之間唯一的區別是,第二次查詢定義了一個額外的常數列來區分其輸出:

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

如果StructuredStreaming將消耗只有一個流,那麼我們應該看到這兩個查詢發表了相同的話。在每個查詢消耗單獨的流的情況下,那麼我們將由每個查詢報告不同的單詞。

這是觀測的輸出:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

我們可以清楚地看到,每個查詢的數據流是不同的。除非我們可以保證TCP後端服務器向每個打開的連接提供完全相同的數據,否則看起來不可能在socket source提供的數據上定義多個聚合。

相關問題