2016-03-16 80 views
0

在Python中,如果數據庫非常大,一個簡單的select查詢會花費很多時間。我有一張有4,700,000條記錄的表格,如果我使用SELECT * FROM MY_TABLE來獲取表格中的所有數據,則需要18分鐘。通過設置chunk_size並執行並行查詢,它將節省時間。Python,如何實現並行處理

所以,我的代碼是:

import os 
import time 
import multiprocessing 
import pandas as pd 
import MySQLdb as mysql 

if __name__ == '__main__': 
    conn = mysql.connect(host='192.168.0.114', 
         user='root', 
         passwd='fit123456', 
         db='A_stock_day', 
         charset='utf8' 
         ) 
    limit = 100000 
    offset = 0 
    dfs = [] 
    print 'start.....' 
    _s = time.time() 
    while True: 
     _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\ 
       (limit, offset) 
     dfs.append(pd.read_sql(_query, conn)) 
     offset += limit 
     if len(dfs[-1]) < limit: 
      break 
    _e = time.time() 
    print 'Time: ', _e - _s 
    full_df = pd.concat(dfs) 

但是,它仍然需要大約10分鐘。如何並行化它,讓許多線程同時運行,並使執行時間縮短到一個線程的執行時間?我有多重代碼在這裏:

def select(info): 
    """""" 
    limit, offset, conn = info[0], info[1], info[2] 
    _query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\ 
      (limit, offset) 
    s = time.time() 
    info[3].append(pd.read_sql(_query, conn)) 
    e = time.time() 
    print 'time: ', e - s, ' pid: ', os.getpid() 

if __name__ == '__main__': 
    conn = mysql.connect(host='192.168.0.114', 
         user='root', 
         passwd='fit123456', 
         db='A_stock_day', 
         charset='utf8' 
         ) 
    dfs, p, pool= [], [], multiprocessing.Pool(7) 
    info = [(1000000, 0, conn, dfs), 
      (1000000, 1000000, conn, dfs), 
      (1000000, 2000000, conn, dfs), 
      (1000000, 3000000, conn, dfs), 
      (1000000, 4000000, conn, dfs), 
      (1000000, 5000000, conn, dfs), 
      (1000000, 6000000, conn, dfs), 
      ] 
    for _i, _v in enumerate(info): 
     print 'start....', _i 
     _p = multiprocessing.Process(target=select, args=(_v,)) 
     _p.start() 
     _p.join() 
    print 'The End' 

正如你所看到的,儘管它推出的多,只有一個進程在同一時間讀取數據庫。所以,這只是多處理,而不是並行處理。

如何實現並行多處理以節省時間?謝謝。

+0

'_p.join()'使當前進程等待終止子進程。所以你的應用程序在N個進程中一個一個地啓動。 嘗試啓動所有進程,然後加入所有進程。像: 過程= [] 爲_i,_v在枚舉(信息): 打印 '開始....',_i _p = multiprocessing.Process(目標=選擇,ARGS =(_ V,)) _p 。開始() processes.append(_p) #_p.join() #循環 爲_p在後工序: _p.join() –

回答

2

在你的循環

for _i, _v in enumerate(info): 
    print 'start....', _i 
    _p = multiprocessing.Process(target=select, args=(_v,)) 
    _p.start() 
    _p.join() 

你開始進程,然後他們立即加入。這意味着你的主進程永遠不會啓動多於一個額外的子進程(因爲只要它啓動一個子進程,它就會等待這個子進程繼續)。

最直接的辦法來解決,這將是這樣的:

processes = [] 
for _i, _v in enumerate(info): 
    print 'start....', _i 
    _p = multiprocessing.Process(target=select, args=(_v,)) 
    _p.start() 
    processes.append(_p) 
for _p in processes: 
    _p.join() 

然而,更好的辦法是使用你已經創建了pool對象。爲此,代碼應該是這個樣子

pool.apply(select, info) 

不過,我想你會更開心決策select返回它得到(而不是將其追加到一個數組),並調用pool.map代替pool.apply數據。這應該有助於避免一些競爭條件和共享內存問題,我想你會遇到這種問題。

您可以在https://docs.python.org/2/library/multiprocessing.html瞭解更多關於這些功能的信息,但我希望您已經去過那裏了。

+0

謝謝。但是,mysql的連接將在func中失去連接。我認爲這是共享內存問題。 –