我正在構建一個腳本,它使用Multithreading.Pool一次調用多個項目,而且我有一個奇怪的結果。Python多線程和數據庫請求?
擔任首發,這是我(簡化)代碼,但測試工作(或失敗的:P):
#!/usr/bin/env python
import MySQLdb
import time
import multiprocessing
import sys
import logging
TIMEOUT = 2500 # ms, 2,5 seconds
MAX_TIMEOUT = 120000 # ms, 2 minutes
DB_PARAMS = {
'user': 'root',
'passwd': '******',
'db': 'my_db'
}
class CheckState:
def __call__(self, row):
print "Calling for %s" % (row[0],)
# Retrieving n-1 instance
entry_id = row[0]
print entry_id
last_log = database.find_unique("SELECT status FROM entry_logs WHERE entry_id = %s ORDER BY occured DESC LIMIT 1;", (entry_id,))
print entry_id, last_log
class DatabaseBridge:
def __init__(self, *args, **kwargs):
self.cnx = MySQLdb.connect (**kwargs)
self.cnx.autocommit(True)
self.cursor = self.cnx.cursor()
def query_all(self, query, *args):
self.cursor.execute(query, *args)
return self.cursor.fetchall()
def find_unique(self, query, *args):
rows = self.query_all(query, *args);
if len(rows) == 1:
return rows[0]
return None
def execute(self, query, params):
self.cursor.execute(query, params)
return self.cursor.rowcount
def close(self):
self.cursor.close()
self.cnx.close()
database = DatabaseBridge(**DB_PARAMS)
def main():
start_time = time.time()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
try:
logging.info("===================================")
rows = database.query_all("SELECT id FROM entries WHERE is_disabled = 0 AND removed IS NULL AND IFNULL(address, '') != '';")
if len(rows) > 0:
pool_timeout = len(rows) * TIMEOUT
if pool_timeout > MAX_TIMEOUT:
pool_timeout = MAX_TIMEOUT
result = pool.map_async(CheckState(), rows)
pool.close()
pool.join()
logging.info("Running for %d seconds max", float(pool_timeout)/1000)
result.get(timeout=float(pool_timeout)/1000) # Maximum Timeout allowed for security reasons !
end_time = time.time() - start_time
logging.info("Took approx %.2f seconds to run.", end_time)
database.close()
pool.terminate()
return 0
except Exception, err:
end_time = time.time() - start_time
print "An error occured with the script"
print "Took approx %.2f seconds to run." % (end_time,)
logging.error("Took approx %.2f seconds to run.", end_time)
logging.exception("Script failed")
pool.terminate()
database.close()
return 1
if __name__ == "__main__":
sys.exit(main())
在我的數據庫,我在表條目10行,並在表entry_logs,我有n *條記錄行。
entry_logs中的狀態可以是OK,DOWN或INVALID。
在我的數據庫,只有entry_id 5和10有向下或無效,8人都總是有狀態OK
現在,當我運行我的腳本,我得到這個:
Calling for 1
1
Calling for 2
2
Calling for 3
3
Calling for 4
4
1 ('OK',)
Calling for 6
6
Calling for 7
7
Calling for 5
5
6 ('OK',)
Calling for 8
8
8 ('OK',)
Calling for 9
9
9 ('OK',)
Calling for 10
10
10 ('OK',)
5 ('OK',)
4 ('INVALID',)
3 ('OK',)
7 ('OK',)
的關鍵這裏有:
5 ('OK',)
10 ('OK',)
4 ('INVALID',)
這是不可能的,5和10應返回無效,4應該返回OK
所以我猜我的線程正在搞亂DatabaseBridge
類,返回的數據在線程之間交叉,並沒有在正確的線程中返回,但是,我該如何解決?
感謝您的幫助。
Bonus point:有時(不是每次),我的腳本只是掛起,並且從不返回。儘管我在get()
方法上強制執行timeout
,但我必須殺死主進程,並且要殺死一個子進程(可能是掛起的進程)。你有什麼想法嗎?