foreachPartition
是最適合
示例代碼段將它
val dstream = ...
dstream.foreachRDD { rdd =>
//loop through each parttion in rdd
rdd.foreachPartition { partitionOfRecords =>
//1. Create Connection object/pool for Codis, HBase
// Use it if you want record level control in rdd or partion
partitionOfRecords.foreach { record =>
// 2. Write each record to external client
}
// 3. Batch insert if connector supports from an RDD to external source
}
//Use 2 or 3 to write data as per your requirement
}
Another SO Answer for similar use case
檢查:Design Patterns for using foreachRDD
DB連接不能與狀態(瞬態)序列。因此,建議在執行者級別創建/維護連接(或池)。通常每個執行器一個連接都不錯,其他數據庫需要並行執行** executor連接數**。所有的連接都是獨立的,因爲執行者是獨立的。所以,我不確定集中化的想法是非常好的。 – mrsrinivas
但我想維護每個JVM的連接池以降低成本。我的困惑是如何集中它們。 – wttttt
每個執行程序都是JVM進程。正如代碼中所提到的,您可以在每個JVM的示例代碼中的**第1點**處創建數據庫連接池。 – mrsrinivas