2016-08-19 62 views
5

我正在嘗試通過Spark SQL將非常大的MySQL表的內容批量遷移到parquet文件。但是當這樣做的時候,我很快就會耗盡內存,即使設置驅動程序的內存限制更高(我在本地模式下使用spark)。示例代碼:通過Spark SQL進行批量數據遷移

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", "bigdatatable") 
    .option("user", "root") 
    .option("password", "foobar") 
    .load(); 

ds.write().mode(SaveMode.Append).parquet("data/bigdatatable"); 

好像星火嘗試讀取整個表的內容到內存中,這是不會工作非常好。那麼,通過Spark SQL進行批量數據遷移的最佳方法是什麼?

+0

您是得到OOM不是因爲spark配置錯誤,你可能應該啓用驅動流: http://stackoverflow.com/a/2448019/2439539 – r90t

回答

3

在您的解決方案中,Spark將在開始寫入之前將整個表內容讀取到一個分區中。你能夠避免的一種方法是劃分閱讀部分,但它需要在源數據的數字順序列:

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", "bigdatatable") 
    .option("user", "root") 
    .option("password", "foobar") 
    .option("partitionColumn", "NUMERIC_COL") 
    .option("lowerBound", "1") 
    .option("upperBound", "10000") 
    .option("numPartitions", "64") 
    .load(); 

在上面的例子中,列「NUMERIC_COL」必須存在於數據和其理想情況下,應該從1到10000統一變化。當然,這是很多需求,像這樣的列可能不存在,所以您應該在數據庫中創建一個類似於此列的視圖,或者添加它在查詢中(請注意,我使用了通用的SQL語法,您將不得不爲您的DBMS修改):

String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo" 

Dataset<Row> ds = spark.read() 
    .format("jdbc") 
    .option("url", url) 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("dbtable", query) 
    .option("user", "root") 
    .option("password", "foobar") 
    .option("partitionColumn", "NUMERIC_COL") 
    .option("lowerBound", "0") 
    .option("upperBound", "63") 
    .option("numPartitions", "64") 
    .load(); 
+0

這看起來不錯。謝謝@丹尼爾 –