我正在編寫一個測試應用程序,它使用來自Kafka的topcis的消息,然後將數據推入S3並進入RDBMS表(流程類似於此處所示:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)。所以,我讀卡夫卡數據,然後:結構化流如何執行單獨的流式查詢(並行或順序)?
- 每個消息要保存到S3
- 一些消息保存到表A中在外部數據庫(基於過濾條件)
- 一些其它消息保存到表乙在外部數據庫(其它過濾條件)
因此,我已某物喜歡:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
(請注意,我正在閱讀多個卡夫卡主題)。 接下來,我定義所需的數據集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
現在對於我創建查詢開始處理每個數據集:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();
StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();
現在我想知道:
將並行執行這些查詢(或者先進先出順序,我應該將這些查詢分配給獨立的調度程序池)?