2017-04-10 56 views
0

我的SQL數據庫擁有數以百萬計的記錄他們中的一些表有一千幾百萬,我的主要選擇是約4000行代碼,但結構是這樣的:最佳方式使用Pyspark與SQL數據庫

SELECT A.seq field1, field2, field3, field4, 
     (select field from tableX X... where A.seq = X.seq ...) field5, 
     (select field from tableY Y... where A.seq = Y.seq ...) field6, 
     (select field from tableN Z... where A.seq = Z.seq ...) field7, 
     field8, field9 
    FROM tableA A, tableB B, tableN N 
WHERE A.seq = B.seq 
    AND A.req_seq = N.req_seq; 

我的想法是做這樣的事情:

# load the tables in the cluster separately 

conf = SparkConf().setAppName("MyApp") 
sc = SparkContext(master="local[*]", conf=conf) 
sql = HiveContext(sc)  

dataframeA = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableA)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

dataframeB = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableC)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

dataframeC = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableC)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

# then do the needed joins 

df_aux = dataframeA.join(dataframeB, dataframeA.seq == dataframeB.seq) 

df_res_aux = df_aux.join(dataframeC, df_aux.req_seq == dataframeC.req_seq) 


# then with that dataframe calculate the subselect fields 

def calculate_field5(seq): 
    # load the table in the cluster as with the main tables 
    # and query the datafame 
    # or make the query to DB and return the field 
    return field 

df_res = df_res_aux.withColumn('field5', calculate_field5(df_res_aux.seq)) 
# the same for the rest of fields 

這是一個好辦法嗎? 我應該採用不同的方式嗎?

任何意見將真的,真的很感激

回答

0

好,

如果wanto在執行使用MySQL,這是做到這一點的方式。

但得到一些說明,也許你的執行將花費大量的時間來運行,由於mySql查詢時間。 MySql不是分佈式數據庫,因此您可以花費大量時間從mySql檢索數據。

我建議你。

嘗試將數據檢索到hdfs(如果您有HDFS),請嘗試使用SqoopHere一個例子如何以增量的方式使用它。

嘗試轉換存儲爲Orc的數據。請參閱示例here

這個建議是爲了減少數據庫的執行時間。每次你從你的MySql中直接請求數據。您正在使用MySql的資源將數據發送到Spark。按照我的建議,您可以將您的數據庫複製到HDFS並將這些數據提交給Spark進行處理。這不會導致您的數據庫執行時間。

爲什麼要使用Orc? Orc是將數據轉換爲緊湊和柱狀結構的理想選擇。這會增加您的數據檢索和搜索。

+0

感謝您的回答!我會看看這些技術。 因此,最好將所有需要的表檢索到文件系統或內存中,然後應用過濾器 –

相關問題