2016-05-13 47 views
3

我有一個SQLite3數據庫。我需要解析10000個文件。我從每個文件中讀取一些數據,然後用這些數據查詢數據庫以獲得結果。我的代碼在單個進程環境中工作正常。但是在嘗試使用多處理池時出現錯誤。Python:使用sqlite3與多處理

My approach without multiprocessing (works OK): 
1. Open DB connection object 
2. for f in files: 
    foo(f, x1=x1, x2=x2, ..., db=DB) 
3. Close DB 

My approach with multiprocessing (does NOT work): 
1. Open DB 
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files]) 
4. pool.close() 
5. Close DB 

我得到以下錯誤:sqlite3.ProgrammingError:基本光標.__ init__不叫。

我的DB類實現如下:

def open_db(sqlite_file): 
    """Open SQLite database connection. 

    Args: 
    sqlite_file -- File path 

    Return: 
    Connection 
    """ 

    log.info('Open SQLite database %s', sqlite_file) 
    try: 
     conn = sqlite3.connect(sqlite_file) 
    except sqlite3.Error, e: 
     log.error('Unable to open SQLite database %s', e.args[0]) 
     sys.exit(1) 

    return conn 

def close_db(conn, sqlite_file): 
    """Close SQLite database connection. 

    Args: 
    conn -- Connection 
    """ 

    if conn: 
     log.info('Close SQLite database %s', sqlite_file) 
     conn.close() 

class MapDB: 

    def __init__(self, sqlite_file): 
     """Initialize. 

     Args: 
     sqlite_file -- File path 
     """ 

     # 1. Open database. 
     # 2. Setup to receive data as dict(). 
     # 3. Get cursor to execute queries. 
     self._sqlite_file  = sqlite_file 
     self._conn    = open_db(sqlite_file) 
     self._conn.row_factory = sqlite3.Row 
     self._cursor   = self._conn.cursor() 

    def close(self): 
     """Close DB connection.""" 

     if self._cursor: 
      self._cursor.close() 
     close_db(self._conn, self._sqlite_file) 

    def check(self): 
     ... 

    def get_driver_net(self, net): 
     ... 

    def get_cell_id(self, net): 
     ... 

函數foo()看起來是這樣的:

def foo(f, x1, x2, db): 

    extract some data from file f 
    r1 = db.get_driver_net(...) 
    r2 = db.get_cell_id(...) 

整體工作不落實如下:

mapdb = MapDB(sqlite_file) 

log.info('Create NetInfo objects') 
pool = multiprocessing.Pool(processes=4) 
files = [get list of files to process]     
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)  
pool.close() 
mapdb.close() 

爲了解決這個問題,我想我需要在每個池工作者內部創建MapDB()對象(所以有4個並行/獨立的連接)。但我不知道如何做到這一點。有人能告訴我一個如何用Pool完成這個任務的例子嗎?

回答

2

有關定義foo這樣的內容:

def foo(f, x1, x2, db_path): 
    mapdb = MapDB(db_path) 
    ... open mapdb 
    ... process data ... 
    ... close mapdb 

,然後改變你的pool.map呼籲:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)  

更新

另一種選擇是處理工作線程並通過Queue分發工作。

from Queue import Queue 
from threading import Thread 

q = Queue() 

def worker(): 
    mapdb = ...open the sqlite database 
    while True: 
    item = q.get() 
    if item[0] == "file": 
     file = item[1] 
     ... process file ... 
     q.task_done() 
    else: 
     q.task_done() 
     break 
    ...close sqlite connection... 

# Start up the workers 

nworkers = 4 

for i in range(nworkers): 
    worker = Thread(target=worker) 
    worker.daemon = True 
    worker.start() 

# Place work on the Queue 

for x in ...list of files...: 
    q.put(("file",x)) 

# Place termination tokens onto the Queue 

for i in range(nworkers): 
    q.put(("end",)) 

# Wait for all work to be done. 

q.join() 

終止令牌用於確保sqlite連接已關閉 - 以防萬一。

+0

但是,如果我正在處理10000個文件,那麼我不會最終打開/關閉DB句柄10000次?我不確定是否會有明顯的表現,但我會介紹它。 – user4979733

+0

以下是數據:串行運行:約18分鐘處理所有內容,約56000個數據庫查詢。 4池與您的建議運行:~3分28秒。這應該對我的需求非常好!謝謝。 – user4979733

+0

我剛用另一種方法更新了答案。 :-) – ErikR