2016-04-11 16 views
2

因此,我想對我的Spark DataFrame執行某些操作,將它們寫入數據庫並在最後創建另一個DataFrame。它看起來像這樣:Spark:如何使用mapPartition並創建/關閉每個分區的連接

import sqlContext.implicits._ 

val newDF = myDF.mapPartitions(
    iterator => { 
    val conn = new DbConnection 
    iterator.map(
     row => { 
     addRowToBatch(row) 
     convertRowToObject(row) 
    }) 
    conn.writeTheBatchToDB() 
    conn.close() 
    }) 
    .toDF() 

這給了我一個錯誤mapPartitions預計返回類型的Iterator[NotInferedR],但在這裏它是Unit。我知道這是可能的forEachPartition,但我想也做映射。單獨做這件事將會是開銷(額外的火花工作)。該怎麼辦?

謝謝!

回答

3

在匿名函數實現的最後一個表達式必須返回值:

import sqlContext.implicits._ 

val newDF = myDF.mapPartitions(
    iterator => { 
    val conn = new DbConnection 
    // using toList to force eager computation - make it happen now when connection is open 
    val result = iterator.map(/* the same... */).toList 
    conn.writeTheBatchToDB() 
    conn.close() 
    result.iterator 
    } 
).toDF() 
+0

如果我必須在'iterator.map()'函數內部使用'conn'會怎麼樣?我不會得到一個連接已經關閉的異常? – void

+0

嗯 - 你是對的 - 因爲iterator.map的懶惰,實際的計算只會在連接關閉後使用'result'迭代器時發生。我會修復答案來反映這一點 - 謝謝 –

+0

謝謝!我已經把它作爲一個單獨的問題。你可以看看它http://stackoverflow.com/questions/37881042/spark-db-connection-per-spark-rdd-partition-and-do-mappartition – void

2

在大多數情況下,急於消費迭代器都將導致對執行失敗,如果不是工作放緩。因此,我所做的是檢查迭代器是否已經是空的,然後執行清理例程。

rdd.mapPartitions(itr => { 
    val conn = new DbConnection 
    itr.map(data => { 
     val yourActualResult = // do something with your data and conn here 
     if(itr.isEmpty) conn.close // close the connection 
     yourActualResult 
    }) 
}) 

認爲這是一個火花的問題,但實際上是一個scala。 http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean

相關問題