2016-11-28 101 views
3

我在Spark中有一個ETL工作,它也連接到MySQL以獲取一些數據。從歷史上看,我已經做了如下:Spark ETL作業只執行一次mysql

hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()).registerTempTable("tmp_users"); 

Row[] res = hiveContext.sql("SELECT " 
    + " u.name, " 
    + " SUM(s.revenue) AS revenue " 
    + "FROM " 
    + " stats s " 
    + " INNER JOIN tmp_users u " 
    + "  ON u.id = s.user_id 
    + "GROUP BY " 
    + " u.name " 
    + "ORDER BY " 
    + " revenue DESC 
    + "LIMIT 10").collect(); 

String ids = ""; 
// now grab me some info for users that are in tmp_user_stats 
for (i = 0; i < res.length; i++) { 
    s += (!s.equals("") ? "," : "") + res[i](0); 
} 

hiveContext.jdbc(
dbProperties.getProperty("myDbInfo"), 
"(SELECT name, surname, home_address FROM users WHERE id IN ("+ids+")) r", 
new Properties()).registerTempTable("tmp_users_prises"); 

然而,這個比例到多個工作節點,每當我用tmp_users表時,它運行的查詢,並使它運行(至少)一次,每個節點,這歸結於我們的db管理員用刀在辦公室跑來跑去。

處理這個問題的最佳方法是什麼?我可以像3臺機器一樣運行作業,將其限制爲3個查詢,然後將數據寫入Hadoop以供其他節點使用它或什麼?

本質上 - 正如評論中所建議的那樣 - 我可以在ETL作業之外運行一個查詢,它可以從MySQL端準備數據並將其導入Hadoop。但是,可能會有後續查詢,建議通過Spark和JDBC連接設置更多的解決方案。

我會接受Sqoop解決方案,因爲它至少提供了一個更簡化的解決方案,儘管我仍然不確定它會完成這項工作。如果我找到了一些東西,我會再次編輯這個問題。

回答

1

您可以緩存數據:

val initialDF = hiveContext.read().jdbc(
    dbProperties.getProperty("myDbInfo"), 
    "(SELECT id, name FROM users) r", 
    new Properties()) 
initialDF.cache(); 
initialDF.registerTempTable("tmp_users"); 

第一次讀取後,數據將被緩存在內存

替代(即不傷DBA;))是使用Sqoop與參數--num-mappers=3然後將結果文件導入到Spark