2015-01-12 94 views
6

我有一個主要的Python腳本連接到MySQL數據庫,並從中剔除少量記錄。基於返回的結果,它將啓動儘可能多的線程(類實例)和抓取許多記錄。每個線程應該返回到數據庫並通過將一個狀態標誌設置爲不同的狀態(「進程已啓動」)來更新另一個表。如何使用Python多線程處理MySQL連接(s)

要做到這一點我想:

1)通過了數據庫連接的所有主題 2)打開從每個線程

一個新的數據庫連接,但他們都不是工作。

我可以運行我的更新沒有任何問題在這兩種情況下通過使用try/except,但MySQL表尚未更新,並且沒有生成錯誤。我在這兩種情況下都使用了commit。

我的問題是如何在這種情況下處理MySQL連接?

更新基於第一幾點意見:

MAIN SCRIPT 
----------- 

#Connecting to DB 
db = MySQLdb.connect(host = db_host, 
         db = db_db, 
         port = db_port, 
         user = db_user, 
         passwd = db_password, 
         charset='utf8') 

# Initiating database cursor 
cur = db.cursor() 

# Fetching records for which I need to initiate a class instance 

cur.execute('SELECT ...') 

for row in cur.fetchall() : 
    # Initiating new instance, appending it to a list and 
    # starting all of them 



CLASS WHICH IS INSTANTIATED 
--------------------------- 

# Connecting to DB again. I also tried to pass connection 
# which has been opened in the main script but it did not 
# work either. 

db = MySQLdb.connect(host = db_host, 
         db = db_db, 
         port = db_port, 
         user = db_user, 
         passwd = db_password, 
         charset='utf8') 

# Initiating database cursor 
cur_class = db.cursor() 
cur.execute('UPDATE ...') 
db.commit() 
+0

很難說,不知道如何連接到你的數據庫,你如何實現更新任何東西。 – Ashalynd

+0

我不完全理解你的問題。是最簡單的情況下工作,像單線程python連接到MySQL和更新表? – qqibrow

+0

@Ashalynd感謝您花時間!抱歉,我是AFK。我用我的主代碼和類代碼中的代碼snipet更新了我的問題。這是我如何啓動實例並打開數據庫連接的方式。我試圖捕捉錯誤,當打開連接和執行查詢從嘗試/除了沒有運氣的情況下。 – g0m3z

回答

3

看來我的代碼沒有問題,但與我的MySQL版本。我正在使用MySQL標準社區版,並根據發現的官方文檔here

線程池插件是一個商業功能。它不包含在MySQL社區發行版中。

我即將升級到MariaDB來解決此問題。

9

下面是使用Python中的多線程處理MySQL的一個例子,我不知道你 表和數據,因此,只需更改代碼可能幫助:

import threading 
import time 
import MySQLdb 

Num_Of_threads = 5 

class myThread(threading.Thread): 

    def __init__(self, conn, cur, data_to_deal): 
     threading.Thread.__init__(self) 
     self.threadID = threadID 
     self.conn = conn 
     self.cur = cur 
     self.data_to_deal 

    def run(self): 

     # add your sql 
     sql = 'insert into table id values ({0});' 
     for i in self.data_to_deal: 
      self.cur.execute(sql.format(i)) 
      self.conn.commit() 

threads = [] 
data_list = [1,2,3,4,5] 

for i in range(Num_Of_threads): 
    conn = MySQLdb.connect(host='localhost',user='root',passwd='',db='') 
    cur = conn.cursor() 
    new_thread = myThread(conn, cur, data_list[i]) 

for th in threads: 
    th.start() 

for t in threads: 
    t.join() 
+0

對不起,我遲到的回覆。參考你上面的例子,我有兩個不同的文件,我的課程和我的主要腳本。我猜這應該不是問題。另一個我做不同的事情是我不把我的data_list傳遞給我的線程,因爲我需要我的線程即時從我的數據庫查詢數據。所以我做的是:1)打開一個數據庫連接(主腳本)2)查詢記錄(主腳本)3)啓動儘可能多的類實例和我擁有的許多記錄(主腳本)4。)嘗試從每個實例(類實例)更新數據庫中的表記錄 – g0m3z

1

看起來像mysql 5.7確實支持多線程。

正如你以前試過的 - 絕對確保在def worker()中傳遞連接。全局定義的連接是我的錯

下面是通過5個線程打印10個記錄示例代碼,5次

import MySQLdb 
import threading 


def write_good_proxies():  
    local_db = MySQLdb.connect("localhost","username","PassW","DB", port=3306) 
    local_cursor = local_db.cursor (MySQLdb.cursors.DictCursor) 
    sql_select = 'select http from zproxies where update_time is null order by rand() limit 10' 
    local_cursor.execute(sql_select) 
    records = local_cursor.fetchall() 
    id_list = [f['http'] for f in records] 
    print id_list 
def worker(): 
    x=0 
    while x< 5: 
     x = x+1 
     write_good_proxies() 

threads = [] 


for i in range(5): 
    print i 
    t = threading.Thread(target=worker) 
    threads.append(t) 
    t.start()