0
我們有這個龐大的遺留數據庫表,我們需要從中提取數據並將其推送到s3。以下是我如何查詢一部分數據和寫入輸出。在實地編寫期間發生Spark ClosedChannelException異常
def writeTableInParts(tableName: String, numIdsPerParquet: Long, numPartitionsAtATime: Int, startFrom : Long = -1, endTo : Long = -1, filePrefix : String = s3Prefix) = {
val minId : Long = if (startFrom > 0) startFrom else findMinCol(tableName, "id")
val maxId : Long = if (endTo > 0) endTo else findMaxCol(tableName, "id")
(minId until maxId by numIdsPerParquet).toList.sliding(numPartitionsAtATime, numPartitionsAtATime).toList.foreach(list => {
list.map(start => {
val end = math.min(start + numIdsPerParquet, maxId)
sqlContext.read.jdbc(mysqlConStr,
s"(SELECT * FROM $tableName WHERE id >= ${start} AND id < ${end}) as tmpTable",
Map[String, String]())
}).reduce((left, right) => {
left.unionAll(right)
})
.write
.parquet(s"${filePrefix}/$tableName/${list.head}-${list.last + numIdsPerParquet}")
})
}
這工作好了很多不同的表,但無論出於何種原因表繼續得到java.nio.channels.ClosedChannelException
不管我多麼減少掃描窗口或大小。
基於this answer我猜我在我的代碼的某處有異常,但我不知道它會在哪裏,因爲它是一個相當簡單的代碼。我怎樣才能進一步調試這個異常?日誌沒有任何非常有用的東西,也沒有顯示原因。