因此,我想對我的Spark DataFrame執行某些操作,將它們寫入數據庫並在最後創建另一個DataFrame。它看起來像這樣:Spark:如何使用mapPartition並創建/關閉每個分區的連接
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
iterator.map(
row => {
addRowToBatch(row)
convertRowToObject(row)
})
conn.writeTheBatchToDB()
conn.close()
})
.toDF()
這給了我一個錯誤mapPartitions預計返回類型的Iterator[NotInferedR]
,但在這裏它是Unit
。我知道這是可能的forEachPartition,但我想也做映射。單獨做這件事將會是開銷(額外的火花工作)。該怎麼辦?
謝謝!
如果我必須在'iterator.map()'函數內部使用'conn'會怎麼樣?我不會得到一個連接已經關閉的異常? – void
嗯 - 你是對的 - 因爲iterator.map的懶惰,實際的計算只會在連接關閉後使用'result'迭代器時發生。我會修復答案來反映這一點 - 謝謝 –
謝謝!我已經把它作爲一個單獨的問題。你可以看看它http://stackoverflow.com/questions/37881042/spark-db-connection-per-spark-rdd-partition-and-do-mappartition – void