我有一個腳本,它從MySQL表中檢索活動「作業」列表,然後使用多處理庫爲每個活動作業實例化我的主腳本一次。我的多處理腳本有一個函數,用於檢查給定作業是否被另一個線程聲明。它通過檢查數據庫表中的特定列是否爲/不是NULL來執行此操作。該數據庫查詢返回單個項目的元組:python 27 - 多處理時布爾檢查失敗
def check_if_job_claimed():
#...
job_claimed = cursor.fetchone() #Returns (claim_id,) for claimed jobs, and (None,) for unclaimed jobs
if job_claimed:
print "This job has already been claimed by another thread."
return
else:
do_stuff_to_claim_the_job
當我運行這個功能沒有多處理部分,該要求檢查工作得很好。但是,當我嘗試並行運行作業時,索賠檢查將所有(無)元組讀取爲有價值並因此具有真實性,因此函數假定作業已被聲明。
我試着調整多處理器使用的併發進程的數量,但聲明檢查仍然無法工作......即使當我將進程數量設置爲1.我也嘗試過使用if語句看看我是否可以這樣工作:
if job_claimed == True
if job_claimed == (None,)
# etc.
雖然沒有運氣。
是否有人知道多處理庫中的某些內容會阻止我的聲明檢查函數正確解釋job_claimed元組?也許我的代碼有問題嗎?
編輯
我已經運行在調試模式下job_claimed變量一些感實性測試。下面是這些測試的結果:
(pdb) job_claimed
(None,)
(pdb) len(job_claimed)
1
(pdb) job_claimed == True
False
(pdb) job_claimed == False
False
(pdb) job_claimed[0]
None
(pdb) job_claimed[0] == True
False
(pdb) job_claimed[0] == False
False
(pdb) any(job_claimed)
False
(pdb) all(job_claimed)
False
(pdb) job_claimed is not True
True
(pdb) job_claimed is not False
True
編輯
按照要求:
with open('Resource_File.txt', 'r') as f:
creds = eval(f.read())
connection = mysql.connector.connect(user=creds["mysql_user"],password=creds["mysql_pw"],host=creds["mysql_host"],database=creds["mysql_db"],use_pure=False,buffered=True)
def check_if_job_claimed(job_id):
cursor = connection.cursor()
thread_id_query = "SELECT Thread_Id FROM jobs WHERE Job_ID=\'{}\';".format(job_id)
cursor.execute(thread_id_query)
job_claimed = cursor.fetchone()
job_claimed = job_claimed[0]
if job_claimed:
print "This job has already been claimed by another thread. Moving on to next job..."
cursor.close()
return False
else:
thread_id = socket.gethostname()+':'+str(random.randint(0,1000))
claim_job = "UPDATE jobs SET Thread_Id = \'{}\' WHERE Job_ID = \'{}\';".format(job_id)
cursor.execute(claim_job)
connection.commit()
print "Job is now claimed"
cursor.close()
return True
def call_the_queen(dict_of_job_attributes):
if check_if_job_claimed(dict_of_job_attributes['job_id']):
instance = OM(dict_of_job_attributes) #<-- Create instance of my target class
instance.queen_bee()
#multiprocessing code
import multiprocessing as mp
if __name__ == '__main__':
active_jobs = get_active_jobs()
pool = mp.Pool(processes = 4)
pool.map(call_the_queen,active_jobs)
pool.close()
pool.join()
這是怎麼回事 - 不是做這個複雜的龐然大物,而是把所有的工作ID放到一個隊列中(例如Redis中的一個列表),然後只是簡單地'彈出'一個工作ID。這是一個原子操作,所以當工作人員檢索作業ID時,其他進程無法竊取它。 – yedpodtrzitko
您是否可以包含多處理代碼以及創建遊標的代碼。我想象你在進程中重複使用遊標對象,並且只有1個項目 –
是的,那些真實性測試沒有用處,這是每個python程序的預期結果。 –