我從星火遷移流來流結構和我現在所面臨的問題與下面的代碼:爲什麼foreachPartition出錯流數據集?
def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
inputDataset.foreachPartition { partitionIterator =>
val filteredIterator = partitionIterator.filter(foobar.filter)
...
...
}
}
val streamingQuery = inputDataset
.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start
它出現了錯誤有以下AnalysisException
:
產生的原因:org.apache.spark .sql.AnalysisException:使用流源的查詢必須使用writeStream.start();;執行。
是否foreachPartition
不支持流式查詢?在這種情況下,writeStream.foreach
是唯一實現foreachPartition
的方法嗎?
我想避免發送每個事件,而是積累所有行,形成一個巨大的POST請求體並將其發送到HTTP端點。因此,如果批處理和5個分區中的1000個事件與每個請求主體中的200個事件並行生成5個請求。
我有一個要求做到這一點:foreachPartition(迭代器 - >一次性從迭代器收集該分區的所有內容 - >用它做一些事情。是否有辦法收集該分區的所有事件? –
我不認爲foreach接收器允許這個 - > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink。斯卡拉#L49-L64 –