2016-09-28 18 views
0

我有一個腳本,它從MySQL表中檢索活動「作業」列表,然後使用多處理庫爲每個活動作業實例化我的主腳本一次。我的多處理腳本有一個函數,用於檢查給定作業是否被另一個線程聲明。它通過檢查數據庫表中的特定列是否爲/不是NULL來執行此操作。該數據庫查詢返回單個項目的元組:python 27 - 多處理時布爾檢查失敗

def check_if_job_claimed(): 
    #... 
    job_claimed = cursor.fetchone() #Returns (claim_id,) for claimed jobs, and (None,) for unclaimed jobs 
    if job_claimed: 
     print "This job has already been claimed by another thread." 
     return 
    else: 
     do_stuff_to_claim_the_job 

當我運行這個功能沒有多處理部分,該要求檢查工作得很好。但是,當我嘗試並行運行作業時,索賠檢查將所有(無)元組讀取爲有價值並因此具有真實性,因此函數假定作業已被聲明。

我試着調整多處理器使用的併發進程的數量,但聲明檢查仍然無法工作......即使當我將進程數量設置爲1.我也嘗試過使用if語句看看我是否可以這樣工作:

if job_claimed == True 
if job_claimed == (None,) 
# etc. 

雖然沒有運氣。

是否有人知道多處理庫中的某些內容會阻止我的聲明檢查函數正確解釋job_claimed元組?也許我的代碼有問題嗎?

編輯

我已經運行在調試模式下job_claimed變量一些感實性測試。下面是這些測試的結果:

(pdb) job_claimed 
    (None,) 
(pdb) len(job_claimed) 
    1 
(pdb) job_claimed == True 
    False 
(pdb) job_claimed == False 
    False 
(pdb) job_claimed[0] 
    None 
(pdb) job_claimed[0] == True 
    False 
(pdb) job_claimed[0] == False 
    False 
(pdb) any(job_claimed) 
    False 
(pdb) all(job_claimed) 
    False 
(pdb) job_claimed is not True 
    True 
(pdb) job_claimed is not False 
    True 

編輯

按照要求:

with open('Resource_File.txt', 'r') as f: 
    creds = eval(f.read()) 
connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True) 

def check_if_job_claimed(job_id): 
    cursor = connection.cursor() 
    thread_id_query = "SELECT Thread_Id FROM jobs WHERE Job_ID=\'{}\';".format(job_id) 
    cursor.execute(thread_id_query) 
    job_claimed = cursor.fetchone() 
    job_claimed = job_claimed[0] 
    if job_claimed: 
     print "This job has already been claimed by another thread. Moving on to next job..." 
     cursor.close() 
     return False 
    else: 
     thread_id = socket.gethostname()+':'+str(random.randint(0,1000)) 
     claim_job = "UPDATE jobs SET Thread_Id = \'{}\' WHERE Job_ID = \'{}\';".format(job_id) 
     cursor.execute(claim_job) 
     connection.commit() 
     print "Job is now claimed" 
     cursor.close() 
     return True 

def call_the_queen(dict_of_job_attributes): 
    if check_if_job_claimed(dict_of_job_attributes['job_id']): 
     instance = OM(dict_of_job_attributes) #<-- Create instance of my target class 
     instance.queen_bee() 

#multiprocessing code 
import multiprocessing as mp 
if __name__ == '__main__': 
    active_jobs = get_active_jobs() 
    pool = mp.Pool(processes = 4) 
    pool.map(call_the_queen,active_jobs) 
    pool.close() 
    pool.join() 
+0

這是怎麼回事 - 不是做這個複雜的龐然大物,而是把所有的工作ID放到一個隊列中(例如Redis中的一個列表),然後只是簡單地'彈出'一個工作ID。這是一個原子操作,所以當工作人員檢索作業ID時,其他進程無法竊取它。 – yedpodtrzitko

+0

您是否可以包含多處理代碼以及創建遊標的代碼。我想象你在進程中重複使用遊標對象,並且只有1個項目 –

+0

是的,那些真實性測試沒有用處,這是每個python程序的預期結果。 –

回答

1

任何非空的元組(或列表,串,迭代等)將評估到True。迭代的內容是否爲真並不重要。要測試它,可以使用any(iterable)all(iterable)來測試迭代器中的任何或所有項目是否評估爲True。

但是,根據您的編輯,您的問題可能是由跨多個進程使用全局連接對象引起的。

相反,每個進程應該創建它自己的連接。

def check_if_job_claimed(job_id): 
    connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True) 

您也可以嘗試使用connection pooling,但我不知道是否會在工藝工作,並有可能需要你切換到線程來代替。

此外,我會將if __name__ == '__main__':下的所有代碼轉換爲函數。您通常希望避免在使用多處理時污染全局名稱空間,因爲當python創建新進程時,它會嘗試將全局狀態複製到新進程。這可能會導致一些奇怪的錯誤,因爲全局變量不再共享狀態(因爲它們處於單獨的進程中),或者在新進程中重建時某個對象無法序列化或丟失一些信息。

+0

感謝您回覆Brendan。我很感激。我最初考慮過同樣的事情,但後來我開始對job_claimed變量進行真實性測試(包括使用any()和all())。我應該在我的OP中包含這些測試結果。我現在會更新它。總之,我得到了一些令人困惑的結果。我承認,這種困惑可能源於我缺乏理解。然而,這些都不能解釋爲什麼當多處理器沒有調用完全相同的函數時,它的工作原理是完美的,而且當它失敗時會失敗。 –

+0

@NickMiller你在每個進程中使用了相同的'cursor'嗎?還是你爲每個過程創建一個新的?當你耗盡物品清單時,'fetchone'返回'無'。 –

+0

這是一個非常好的問題。我會很快研究並回復你。 –