3
我有一個SQLite3數據庫。我需要解析10000個文件。我從每個文件中讀取一些數據,然後用這些數據查詢數據庫以獲得結果。我的代碼在單個進程環境中工作正常。但是在嘗試使用多處理池時出現錯誤。Python:使用sqlite3與多處理
My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB
My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB
我得到以下錯誤:sqlite3.ProgrammingError:基本光標.__ init__不叫。
我的DB類實現如下:
def open_db(sqlite_file):
"""Open SQLite database connection.
Args:
sqlite_file -- File path
Return:
Connection
"""
log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)
return conn
def close_db(conn, sqlite_file):
"""Close SQLite database connection.
Args:
conn -- Connection
"""
if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()
class MapDB:
def __init__(self, sqlite_file):
"""Initialize.
Args:
sqlite_file -- File path
"""
# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()
def close(self):
"""Close DB connection."""
if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)
def check(self):
...
def get_driver_net(self, net):
...
def get_cell_id(self, net):
...
函數foo()看起來是這樣的:
def foo(f, x1, x2, db):
extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)
整體工作不落實如下:
mapdb = MapDB(sqlite_file)
log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()
爲了解決這個問題,我想我需要在每個池工作者內部創建MapDB()對象(所以有4個並行/獨立的連接)。但我不知道如何做到這一點。有人能告訴我一個如何用Pool完成這個任務的例子嗎?
但是,如果我正在處理10000個文件,那麼我不會最終打開/關閉DB句柄10000次?我不確定是否會有明顯的表現,但我會介紹它。 – user4979733
以下是數據:串行運行:約18分鐘處理所有內容,約56000個數據庫查詢。 4池與您的建議運行:~3分28秒。這應該對我的需求非常好!謝謝。 – user4979733
我剛用另一種方法更新了答案。 :-) – ErikR