2017-05-14 37 views
5

我正在編寫一個測試應用程序,它使用來自Kafka的topcis的消息,然後將數據推入S3並進入RDBMS表(流程類似於此處所示:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)。所以,我讀卡夫卡數據,然後:結構化流如何執行單獨的流式查詢(並行或順序)?

  • 每個消息要保存到S3
  • 一些消息保存到表A中在外部數據庫(基於過濾條件)
  • 一些其它消息保存到表乙在外部數據庫(其它過濾條件)

因此,我已某物喜歡:

Dataset<Row> df = spark 
.readStream() 
.format("kafka") 
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
.option("subscribe", "topic1,topic2,topic3") 
.option("startingOffsets", "earliest") 
.load() 
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value")) 

(請注意,我正在閱讀多個卡夫卡主題)。 接下來,我定義所需的數據集:

Dataset<Row> allMessages = df.select(.....) 
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements 
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions 

現在對於我創建查詢開始處理每個數據集:

StreamingQuery s3Query = allMessages 
.writeStream() 
.format("parquet") 
.option("startingOffsets", "latest") 
.option("path", "s3_location") 
.start() 

StreamingQuery firstQuery = messagesOfType1 
.writeStream() 
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table 
.start(); 

StreamingQuery secondQuery = messagesOfType2 
.writeStream() 
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before) 
.start(); 

現在我想知道:

將並行執行這些查詢(或者先進先出順序,我應該將這些查詢分配給獨立的調度程序池)?

回答

4

公司將在平行

是執行這些查詢。這些查詢將並行執行(每個trigger,您沒有指定,因此將盡可能快地運行它們)。


在內部,當你在一個DataStreamWriter執行start,您創建一個StreamExecution這反過來又創造馬上所謂的守護microBatchThread(從下面the Spark source code引用):

val microBatchThread = 
    new StreamExecutionThread(s"stream execution thread for $prettyIdString") { 
     override def run(): Unit = { 
     // To fix call site like "run at <unknown>:0", we bridge the call site from the caller 
     // thread to this micro batch thread 
     sparkSession.sparkContext.setCallSite(callSite) 
     runBatches() 
     } 
    } 

你可以看到每一個查詢在它自己的線程中使用名稱:

stream execution thread for [prettyIdString] 

您可以使用檢查單獨的線程或jconsole