2017-07-06 25 views
2

我從星火遷移流來流結構和我現在所面臨的問題與下面的代碼:爲什麼foreachPartition出錯流數據集?

def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = { 
    inputDataset.foreachPartition { partitionIterator => 
     val filteredIterator = partitionIterator.filter(foobar.filter) 
     ... 
     ... 
    } 
}  
val streamingQuery = inputDataset 
    .writeStream 
    .trigger(ProcessingTime("5 seconds")) 
    .outputMode("append") 
    .format("console") 
    .start 

它出現了錯誤有以下AnalysisException

產生的原因:org.apache.spark .sql.AnalysisException:使用流源的查詢必須使用writeStream.start();;執行。

是否foreachPartition不支持流式查詢?在這種情況下,writeStream.foreach是唯一實現foreachPartition的方法嗎?

我想避免發送每個事件,而是積累所有行,形成一個巨大的POST請求體並將其發送到HTTP端點。因此,如果批處理和5個分區中的1000個事件與每個請求主體中的200個事件並行生成5個請求。

回答

2

TL; DR是的。 foreachPartition操作不受支持,您應該使用ForeachWriter代替。

引用的foreachPartition的scaladoc:

foreachPartition(F:(迭代[T])⇒單位):單位應用一個函數f到此數據集的每個分區。

正如您現在可能已經發現的那樣,foreach是一個操作,因此會觸發Spark執行。

由於您使用流式數據集,因此不允許使用「傳統」方法(如foreach)觸發它們的執行。

引用結構化數據流的Unsupported Operations

此外,還有一些數據集的方法,不會在流媒體數據集工作。它們是會立即運行查詢並返回結果的操作,這對流式數據集沒有意義。相反,這些功能可以通過明確開始流式查詢來完成(參見下一節)。

其中流替代方案是foreach算子(又名水槽)。這就是結構化數據流中的foreachPartition

引用Using Foreach

在foreach操作允許對輸出數據計算任意的操作。

要使用此功能,您必須實現接口ForeachWriter,該接口具有在觸發器之後每當產生一行作爲輸出的行序列時都會調用的方法。


我想避免將每個事件,因爲它來的,而是積累的所有行,形成一個巨大的POST請求體,並將其發送到HTTP端點。因此,如果批處理和5個分區中的1000個事件與每個請求主體中的200個事件並行生成5個請求。

這似乎是在將數據集寫入接收器之前的聚合,不是嗎?使用groupBy運算符和collect_list函數可將行分組,因此當您使用writeStream時,您將擁有任意數量的組。

我寧願避免處理RDD這種稱爲分區的低級功能,以優化寫入,除非沒有其他方法。

+0

我有一個要求做到這一點:foreachPartition(迭代器 - >一次性從迭代器收集該分區的所有內容 - >用它做一些事情。是否有辦法收集該分區的所有事件? –

+0

我不認爲foreach接收器允許這個 - > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink。斯卡拉#L49-L64 –