0
我的SQL數據庫擁有數以百萬計的記錄他們中的一些表有一千幾百萬,我的主要選擇是約4000行代碼,但結構是這樣的:最佳方式使用Pyspark與SQL數據庫
SELECT A.seq field1, field2, field3, field4,
(select field from tableX X... where A.seq = X.seq ...) field5,
(select field from tableY Y... where A.seq = Y.seq ...) field6,
(select field from tableN Z... where A.seq = Z.seq ...) field7,
field8, field9
FROM tableA A, tableB B, tableN N
WHERE A.seq = B.seq
AND A.req_seq = N.req_seq;
我的想法是做這樣的事情:
# load the tables in the cluster separately
conf = SparkConf().setAppName("MyApp")
sc = SparkContext(master="local[*]", conf=conf)
sql = HiveContext(sc)
dataframeA = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableA)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeB = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
dataframeC = sql.read.format("jdbc").option("url",
"db_url")\
.option("driver", "myDriver")\
.option("dbtable", tableC)\
.option("user", "myuser")\
.option("password", "mypass").load()
# then do the needed joins
df_aux = dataframeA.join(dataframeB, dataframeA.seq == dataframeB.seq)
df_res_aux = df_aux.join(dataframeC, df_aux.req_seq == dataframeC.req_seq)
# then with that dataframe calculate the subselect fields
def calculate_field5(seq):
# load the table in the cluster as with the main tables
# and query the datafame
# or make the query to DB and return the field
return field
df_res = df_res_aux.withColumn('field5', calculate_field5(df_res_aux.seq))
# the same for the rest of fields
這是一個好辦法嗎? 我應該採用不同的方式嗎?
任何意見將真的,真的很感激
感謝您的回答!我會看看這些技術。 因此,最好將所有需要的表檢索到文件系統或內存中,然後應用過濾器 –