2013-07-28 71 views
3

大家好,我是熊貓大師。我正在尋找一種方式來與Python並行運行一些SQL,並返回幾個Pandas數據框。我有類似於下面的代碼,對MS SQL服務器數據庫連續運行4個SQL查詢。其中兩個查詢的執行時間比IO(網絡)時間長得多,所以我想並行化會使代碼運行速度提高兩倍。有沒有簡單的方法來並行執行查詢?如何並行執行多個SQL查詢以併發大熊貓數據框

理想情況下,我希望能夠讀取項目子目錄中的所有* .sql文件,然後啓動並行運行的查詢,並以易於使用的格式返回四個數據框(列表?)進一步操作(索引,加入,聚合)。

由於提前, 蘭德爾

# imports 
import ceODBC 
import numpy as np 
import pandas as pd 
import pandas.io.sql as psql 
from ConfigParser import ConfigParser 
import os 
import glob 

# db connection string 
cnxn = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes' 

# directories (also should be moved to config) 
dataDir = os.getcwd() + '\\data\\' 
sqlDir = os.getcwd() + '\\sql\\' 

# read sql from external .sql files. Possible to read all *.sql files in a sql dir into a list (or other structure...)? 
with open(sqlDir + 'q1.sql', 'r') as f: q1sql = f.read() 
with open(sqlDir + 'q2.sql', 'r') as f: q2sql = f.read() 
with open(sqlDir + 'q3.sql', 'r') as f: q3sql = f.read() 
with open(sqlDir + 'q4.sql', 'r') as f: q4sql = f.read() 

# Connect to db, run SQL, assign result into dataframe, close connection. 
cnxn = ceODBC.connect(cnxn) 
cursor = cnxn.cursor() 

# execute the queries and close the connection. Parallelize? 
df1 = psql.frame_query(q1sql, cnxn) 
df2 = psql.frame_query(q2sql, cnxn) 
df3 = psql.frame_query(q3sql, cnxn) 
df4 = psql.frame_query(q4sql, cnxn) 

# close connection 
cnxn.close() 

回答

2

N個線程的連接使用N-。然後加入theads並推動結果。

# imports 
import ceODBC 
import numpy as np 
import pandas as pd 
import pandas.io.sql as psql 
from ConfigParser import ConfigParser 
import os 
import glob 
import threading 
enter code here 


# db connection string 
cnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes' 

# directories (also should be moved to config) 
dataDir = os.getcwd() + '\\data\\' 
sqlDir = os.getcwd() + '\\sql\\' 

#variable to store results 
responses={} 
responses_lock=threading.Lock() 

maxconnections = 8 
pool_sema = BoundedSemaphore(value=maxconnections) 


def task(fname): 

    with open(fname, 'r') as f: sql = f.read() 

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore 
    pool_sema.acquire() 
    cnxn = ceODBC.connect(cnxn_string) 
    cursor = cnxn.cursor() 
    # execute the queries and close the connection. Parallelize? 
    df = psql.frame_query(sql, cnxn) 
    # close connection 
    cnxn.close() 
    pool_sema.release() 

    # to ensure that only one thread can modify global variable 
    responses_lock.acquire() 
    responses[fname] = df 
    responses_lock.release() 


pool = [] 

#find sql files and spawn theads 
for fname im glob.glob(os.path.join(sqlDir,'*sql')): 
    #create new thread with task 
    thread = threading.Thread(target=task,args=(fname,)) 
    thread.daemon = True 
    # store thread in pool 
    pool.append(thread) 
    #thread started 
    thread.start() 

#wait for all threads tasks done 
for thread in pool: 
    thread.join() 

# results of each execution stored in responses dict 

每個文件在單獨的線程中執行。結果存儲在一個變量中。

等效的功能與with聲明:

def task(fname): 

    with open(fname, 'r') as f: sql = f.read() 

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore 
    with pool_sema: 
     cnxn = ceODBC.connect(cnxn_string) 
     cursor = cnxn.cursor() 
     # execute the queries and close the connection. Parallelize? 
     df = psql.frame_query(sql, cnxn) 
     # close connection 
     cnxn.close() 


    # to ensure that only one thread can modify global variable 
    with responses_lock: 
     responses[fname] = df 

multiprocessing.Pool便於分發任務重,但在它的自我有更多的IO操作。

+1

任何示例代碼開始使用這個?我剛剛開始使用Python和Pandas,之前沒有使用過並行連接或線程。謝謝。 – user2537610