2017-05-25 50 views
1

什麼是「推薦」方式來處理每條消息,因爲它通過結構化流式傳輸管道(我在spark 2.1.1上,源代碼爲kafka 0.10.2.1)?結構化流式傳輸 - 消費每個消息

到目前爲止,我在看dataframe.mapPartitions(因爲我需要連接到hbase,其客戶端連接類不是serizalable,因此mapPartitions)。

想法?

回答

1

您應該能夠使用foreach輸出水槽:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinkshttps://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

即使客戶端不是序列化的,你沒有在你的ForeachWriter構造函數將其打開。只需將它保留爲None/null,並將其初始化爲open方法,該方法在序列化後稱爲,但每個任務只能執行一次。

在排序的僞碼:

class HBaseForeachWriter extends ForeachWriter[MyType] { 
    var client: Option[HBaseClient] = None 
    def open(partitionId: Long, version: Long): Boolean = { 
    client = Some(... open a client ...) 
    } 
    def process(record: MyType) = { 
    client match { 
     case None => throw Exception("shouldn't happen") 
     case Some(cl) => { 
     ... use cl to write record ... 
     } 
    } 
    } 
    def close(errorOrNull: Throwable): Unit = { 
    client.foreach(cl => cl.close()) 
    } 
}