在Python中,如果數據庫非常大,一個簡單的select查詢會花費很多時間。我有一張有4,700,000條記錄的表格,如果我使用SELECT * FROM MY_TABLE
來獲取表格中的所有數據,則需要18分鐘。通過設置chunk_size
並執行並行查詢,它將節省時間。Python,如何實現並行處理
所以,我的代碼是:
import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
limit = 100000
offset = 0
dfs = []
print 'start.....'
_s = time.time()
while True:
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
dfs.append(pd.read_sql(_query, conn))
offset += limit
if len(dfs[-1]) < limit:
break
_e = time.time()
print 'Time: ', _e - _s
full_df = pd.concat(dfs)
但是,它仍然需要大約10分鐘。如何並行化它,讓許多線程同時運行,並使執行時間縮短到一個線程的執行時間?我有多重代碼在這裏:
def select(info):
""""""
limit, offset, conn = info[0], info[1], info[2]
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
s = time.time()
info[3].append(pd.read_sql(_query, conn))
e = time.time()
print 'time: ', e - s, ' pid: ', os.getpid()
if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
dfs, p, pool= [], [], multiprocessing.Pool(7)
info = [(1000000, 0, conn, dfs),
(1000000, 1000000, conn, dfs),
(1000000, 2000000, conn, dfs),
(1000000, 3000000, conn, dfs),
(1000000, 4000000, conn, dfs),
(1000000, 5000000, conn, dfs),
(1000000, 6000000, conn, dfs),
]
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v,))
_p.start()
_p.join()
print 'The End'
正如你所看到的,儘管它推出的多,只有一個進程在同一時間讀取數據庫。所以,這只是多處理,而不是並行處理。
如何實現並行多處理以節省時間?謝謝。
'_p.join()'使當前進程等待終止子進程。所以你的應用程序在N個進程中一個一個地啓動。 嘗試啓動所有進程,然後加入所有進程。像: 過程= [] 爲_i,_v在枚舉(信息): 打印 '開始....',_i _p = multiprocessing.Process(目標=選擇,ARGS =(_ V,)) _p 。開始() processes.append(_p) #_p.join() #循環 爲_p在後工序: _p.join() –