1
什麼是「推薦」方式來處理每條消息,因爲它通過結構化流式傳輸管道(我在spark 2.1.1上,源代碼爲kafka 0.10.2.1)?結構化流式傳輸 - 消費每個消息
到目前爲止,我在看dataframe.mapPartitions
(因爲我需要連接到hbase,其客戶端連接類不是serizalable,因此mapPartitions
)。
想法?
什麼是「推薦」方式來處理每條消息,因爲它通過結構化流式傳輸管道(我在spark 2.1.1上,源代碼爲kafka 0.10.2.1)?結構化流式傳輸 - 消費每個消息
到目前爲止,我在看dataframe.mapPartitions
(因爲我需要連接到hbase,其客戶端連接類不是serizalable,因此mapPartitions
)。
想法?
您應該能夠使用foreach
輸出水槽:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks和https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
即使客戶端不是序列化的,你沒有在你的ForeachWriter
構造函數將其打開。只需將它保留爲None/null,並將其初始化爲open
方法,該方法在序列化後稱爲,但每個任務只能執行一次。
在排序的僞碼:
class HBaseForeachWriter extends ForeachWriter[MyType] {
var client: Option[HBaseClient] = None
def open(partitionId: Long, version: Long): Boolean = {
client = Some(... open a client ...)
}
def process(record: MyType) = {
client match {
case None => throw Exception("shouldn't happen")
case Some(cl) => {
... use cl to write record ...
}
}
}
def close(errorOrNull: Throwable): Unit = {
client.foreach(cl => cl.close())
}
}