2013-11-15 59 views
0

我正在構建一個腳本,它使用Multithreading.Pool一次調用多個項目,而且我有一個奇怪的結果。Python多線程和數據庫請求?

擔任首發,這是我(簡化)代碼,但測試工作(或失敗的:P):

#!/usr/bin/env python 

import MySQLdb 
import time 
import multiprocessing 
import sys 
import logging 

TIMEOUT = 2500 # ms, 2,5 seconds 
MAX_TIMEOUT = 120000 # ms, 2 minutes 

DB_PARAMS = { 
    'user': 'root', 
    'passwd': '******', 
    'db': 'my_db' 
} 

class CheckState: 
    def __call__(self, row): 
     print "Calling for %s" % (row[0],) 

     # Retrieving n-1 instance 
     entry_id = row[0] 
     print entry_id 
     last_log = database.find_unique("SELECT status FROM entry_logs WHERE entry_id = %s ORDER BY occured DESC LIMIT 1;", (entry_id,)) 
     print entry_id, last_log 


class DatabaseBridge: 
    def __init__(self, *args, **kwargs): 
     self.cnx = MySQLdb.connect (**kwargs) 
     self.cnx.autocommit(True) 
     self.cursor = self.cnx.cursor() 

    def query_all(self, query, *args): 
     self.cursor.execute(query, *args) 
     return self.cursor.fetchall() 

    def find_unique(self, query, *args): 
     rows = self.query_all(query, *args); 
     if len(rows) == 1: 
      return rows[0] 

     return None 

    def execute(self, query, params): 
     self.cursor.execute(query, params) 
     return self.cursor.rowcount 

    def close(self): 
     self.cursor.close() 
     self.cnx.close() 

database = DatabaseBridge(**DB_PARAMS) 


def main(): 
    start_time = time.time() 
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 

    try: 
     logging.info("===================================") 
     rows = database.query_all("SELECT id FROM entries WHERE is_disabled = 0 AND removed IS NULL AND IFNULL(address, '') != '';") 

     if len(rows) > 0: 
      pool_timeout = len(rows) * TIMEOUT 
      if pool_timeout > MAX_TIMEOUT: 
       pool_timeout = MAX_TIMEOUT 

      result = pool.map_async(CheckState(), rows) 

      pool.close() 
      pool.join() 

      logging.info("Running for %d seconds max", float(pool_timeout)/1000) 
      result.get(timeout=float(pool_timeout)/1000) # Maximum Timeout allowed for security reasons ! 

     end_time = time.time() - start_time 
     logging.info("Took approx %.2f seconds to run.", end_time) 

     database.close() 
     pool.terminate() 

     return 0 
    except Exception, err: 
     end_time = time.time() - start_time 

     print "An error occured with the script" 
     print "Took approx %.2f seconds to run." % (end_time,) 

     logging.error("Took approx %.2f seconds to run.", end_time) 
     logging.exception("Script failed") 

     pool.terminate() 
     database.close() 

     return 1 

if __name__ == "__main__": 
    sys.exit(main()) 

在我的數據庫,我在表條目10行,並在表entry_logs,我有n *條記錄行。

entry_logs中的狀態可以是OK,DOWN或INVALID。

在我的數據庫,只有entry_id 5和10有向下或無效,8人都總是有狀態OK

現在,當我運行我的腳本,我得到這個:

Calling for 1 
1 
Calling for 2 
2 
Calling for 3 
3 
Calling for 4 
4 
1 ('OK',) 
Calling for 6 
6 
Calling for 7 
7 
Calling for 5 
5 
6 ('OK',) 
Calling for 8 
8 
8 ('OK',) 
Calling for 9 
9 
9 ('OK',) 
Calling for 10 
10 
10 ('OK',) 
5 ('OK',) 
4 ('INVALID',) 
3 ('OK',) 
7 ('OK',) 

的關鍵這裏有:

5 ('OK',) 
10 ('OK',) 
4 ('INVALID',) 

這是不可能的,5和10應返回無效,4應該返回OK

所以我猜我的線程正在搞亂DatabaseBridge類,返回的數據在線程之間交叉,並沒有在正確的線程中返回,但是,我該如何解決?

感謝您的幫助。

Bonus point:有時(不是每次),我的腳本只是掛起,並且從不返回。儘管我在get()方法上強制執行timeout,但我必須殺死主進程,並且要殺死一個子進程(可能是掛起的進程)。你有什麼想法嗎?

回答

0

好吧,經過長時間的沉重搜索,我想我找到了原因,但解決方案(或替代方案)將不得不實施。

事實上,該問題大大地解釋here

MySQLdb的用戶指南,該模塊支持級別1(「線程可以共享模塊,但不連接。」)。

簡而言之:我不應該對我的查詢使用相同的連接,因爲它不能保證結果將返回正確的線程。這正是我所經歷的。

現在我有兩個選擇:

1 - 要麼與一個MySQL池 2工作 - 重寫代碼彙集只什麼考慮長來計算,而不是整個工作。我想我會去第二個,更簡單;)