2016-04-19 48 views
0

我們有這個龐大的遺留數據庫表,我們需要從中提取數據並將其推送到s3。以下是我如何查詢一部分數據和寫入輸出。在實地編寫期間發生Spark ClosedChannelException異常

def writeTableInParts(tableName: String, numIdsPerParquet: Long, numPartitionsAtATime: Int, startFrom : Long = -1, endTo : Long = -1, filePrefix : String = s3Prefix) = { 
    val minId : Long = if (startFrom > 0) startFrom else findMinCol(tableName, "id") 
    val maxId : Long = if (endTo > 0) endTo else findMaxCol(tableName, "id") 

    (minId until maxId by numIdsPerParquet).toList.sliding(numPartitionsAtATime, numPartitionsAtATime).toList.foreach(list => { 
     list.map(start => { 
      val end = math.min(start + numIdsPerParquet, maxId) 

      sqlContext.read.jdbc(mysqlConStr, 
      s"(SELECT * FROM $tableName WHERE id >= ${start} AND id < ${end}) as tmpTable", 
      Map[String, String]()) 
     }).reduce((left, right) => { 
      left.unionAll(right) 
     }) 
     .write 
     .parquet(s"${filePrefix}/$tableName/${list.head}-${list.last + numIdsPerParquet}") 
    }) 
    } 

這工作好了很多不同的表,但無論出於何種原因表繼續得到java.nio.channels.ClosedChannelException不管我多麼減少掃描窗口或大小。

基於this answer我猜我在我的代碼的某處有異常,但我不知道它會在哪裏,因爲它是一個相當簡單的代碼。我怎樣才能進一步調試這個異常?日誌沒有任何非常有用的東西,也沒有顯示原因。

回答

相關問題