2013-07-31 49 views
0

我有一個python腳本,用於驗證從數據庫中的某些行中獲取的數據,然後將這些錯誤記錄在同一數據庫的不同表中。 腳本驗證每一行並將其標記爲已驗證& has error = True/False取決於驗證結果。這一過程對每一行都重複進行。因此,我認爲我會通過創建線程來添加一些類固醇,以便每個行的驗證由獨立線程完成,從而減少驗證一批行所需的時間。此非線程腳本意外運行速度比線程版本要快

令我吃驚的是,我發現線程腳本比非線程腳本稍微長一些。平均來驗證1502行數據時,非線程腳本需要1.5秒,而線程腳本需要2.27秒。這可能不會太多,但理想情況下,我將一次處理200萬條記錄,這樣時間開銷會很大。這加上我會假設線程應用程序會更快完成! :-)

這兩個腳本的時鐘同時約爲0.01秒,直到創建線程爲止。在這一點上,SQLAlchemy會話被創建,並且所有要驗證的數據和關係,即外鍵等都被獲取。從那裏開始,非線程腳本完成得更快。以下是我的代碼。

1.0無 - 螺紋腳本

#Alot of code goes above this to fetch the data that is passed on to the validator function 
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them. 
for lf_detail_id in load_file_detail_id: 
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \ 
      data[lf_detail_counter], template_version[lf_counter], \ 
      load_file_detail, error, dt_file, dt_columns 
    data_list.append(params) 
    lf_detail_counter += 1 
    no_of_records += 1 

validator = Validate() 
validator.validator(no_of_records, data_list) 
record_counter += lf_detail_counter 
data_list = None 
no_of_records = 0 
print("Validated '%s': seconds %s" %(filename[lf_counter], time.time()-file_start_time))  #print time it took to run' 

#Mark the load file as validated 
is_done = load_file.set_validation(load_file_id, True) 
if is_done == False: 
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id) 

#Reset counters 
lf_detail_counter = 0 
lf_counter += 1 

#Commit The Entire Transaction. 
session.commit() 
print("NoThread:Finished validating %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()- process_start_time)) 

1.1。驗證功能對於非線程腳本

class Validate(): 
    has_error = None 
    def validator(self, loop_length, job):     
     '''Validate data''' 
     for row_counter in range(loop_length): 
      load_file_detail_id, load_file_id, entry_number, data, \ 
      template_version, load_file_detail, error, dt_file, dt_columns = job[row_counter] 
      error_detail = ErrorLogDetail() 
      if data.strip() == "": 
       error_detail.errorlog = error 
       error_detail.load_file_detail_id = load_file_detail_id 
       error_detail.pos_row = entry_number 
       error_detail.pos_col = None 
       error_detail.value_provided = None 
       error_detail.column_name = None 
       error_detail.value_provided = None 
       error_detail.description = "error message 1" 
       session.add(error_detail) 
       error_detail = ErrorLogDetail() 
       self.has_error = True 
       self.set_validation(load_file_detail, load_file_detail_id, True, False) 
       continue 
      elif len(data) != int(dt_file.data_length): 
       error_detail.errorlog = error 
       error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id 
       error_detail.pos_row = entry_number 
       error_detail.pos_col = None 
       error_detail.column_name = None 
       error_detail.value_provided = None 
       error_detail.description = "error message 2" 
       session.add(error_detail) 
       error_detail = ErrorLogDetail() 
       self.has_error = True 
       self.set_validation(load_file_detail, load_file_detail_id, True, False) 
       continue 
      else: 
       #Continue with extra validation 

      #If record passes all validation then mark mark it as haserror = False 
      if self.has_error == False: 
       self.set_validation(load_file_detail, load_file_detail_id, False, True) 
      else: 
       self.has_error = False 
      jobs.task_done() #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script 

2.0螺紋腳本

#Alot of code goes above this to fetch the data that is passed on to the validator function 
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them. 
for lf_detail_id in load_file_detail_id: 
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \ 
      data[lf_detail_counter], template_version[lf_counter], \ 
      load_file_detail, error, dt_file, dt_columns 
    data_list.append(params) 
    lf_detail_counter += 1 
    queue_size += 1 
    if queue_size == THREAD_LIMIT: 
     myqueuing(queue_size, data_list) 
     queue_size = 0 

#spawn a pool of threads, and pass them queue instance 
if queue_size > 0: 
    myqueuing(queue_size, data_list) 

#Keep record of rows processed 
record_counter += lf_detail_counter 
print("Validated '%s': seconds- %s " %(filename[lf_counter], time.time()-file_start_time))  #print time it took to run' 

#Mark the load file as validated 
is_done = load_file.set_validation(load_file_id, True) 
if is_done == False: 
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id) 

#Commit The Entire Transaction. 
session.commit() 
#Reset counters 
lf_detail_counter = 0 
lf_counter += 1 
data_list = None 
queue_size = 0    
print("HasThread:Finished loading %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()-process_start_time))  #print time it took to run' 

2.1。螺紋檢驗功能

THREAD_LIMIT = 50    # This is how many threads we want 
jobs = queue.Queue()   # This sets up the queue object to use 5 slots 
singlelock = threading.Lock() # This is a lock so threads don't print trough each other (and other reasons) 

def myqueuing(queuesize, data): 
    '''Put the fetched data in a queue and instantiate threads to 
    process the queue''' 
    # Spawn the threads 
    is_valid_date("20131212", True) #Calling this here to avoid a bug in time.striptime() when threading 
    for x in range(queuesize): 
     # This is the thread class that we instantiate. 
     workerbee().start() 

    # Put stuff in queue 
    for i in range(queuesize): 
     # Block if queue is full, and wait 2 seconds. After 5s raise Queue Full error. 
     try: 
      jobs.put(data[i], block=True, timeout=2) 
     except: 
      singlelock.acquire() 
      print ("The queue is full !") 
      singlelock.lock.release()  

    # Wait for the threads to finish 
    singlelock.acquire()  # Acquire the lock so we can print 
    print ("Waiting for threads to finish.") 
    singlelock.release()  # Release the lock 
    jobs.join()     # This command waits for all threads to finish.    


class workerbee(threading.Thread): 

    def __init__(self): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.has_error = False 

    def run(self): 
     #try: 
     job = jobs.get(True,1) 
     load_file_detail_id, load_file_id, entry_number, data, \ 
     template_version, load_file_detail, error, dt_file, dt_columns = job     
     '''Validates the data.''' 
     error_detail = ErrorLogDetail() 
     #Again please note that this part is identical for both the non-threaded and the threaded script. 
     #After each pass on a record, the record is marked as validated and if has_error = True 
     if data.strip() == "": 
      error_detail.errorlog = error 
      error_detail.load_file_detail_id = load_file_detail_id 
      error_detail.pos_row = entry_number 
      error_detail.pos_col = None 
      error_detail.value_provided = None 
      error_detail.column_name = None 
      error_detail.value_provided = None 
      error_detail.description = "erro message1" 
      session.add(error_detail) 
      error_detail = ErrorLogDetail() 
      self.has_error = True 
      self.set_validation(load_file_detail, load_file_detail_id, True, True)  
     elif len(data) != int(dt_file.data_length): 
      error_detail.errorlog = error 
      error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id 
      error_detail.pos_row = entry_number 
      error_detail.pos_col = None 
      error_detail.column_name = None 
      error_detail.value_provided = None 
      error_detail.description = "erro message2") 
      session.add(error_detail) 
      error_detail = ErrorLogDetail() 
      self.has_error = True 
      self.set_validation(load_file_detail, load_file_detail_id, True, True)  
     else: 
      #Continue with further validation - about 5 other validation checks 

     #If record passes all validation then mark mark it as haserror = False 
     if self.has_error == False: 
      self.set_validation(load_file_detail, load_file_detail_id, False, True) 
     else: 
      self.has_error = False 
     jobs.task_done() #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script 

3.0。在這兩個線程和設定驗證的常見功能,非螺紋

def set_validation(self, load_file_detail, load_file_detail_id, has_error, can_be_loaded): 
    '''Mark the record as having been validated and whether has error = True or False''' 
    #print("haserror and canbeloaded ", has_error, can_be_loaded) 
    is_done = load_file_detail.set_validation_and_error(load_file_detail_id, True, has_error, can_be_loaded) 
    if is_done == False: 
     raise Exception ("Can't update load_file_detail's is_validated parameter: ", load_file_detail_id)     

3.1。保存如此,數據讀取要驗證的驗證狀態

def set_validation_and_error(self, load_file_detail_id, is_validated, has_error, can_be_loaded): 
    result = session.execute('UPDATE load_file_detail SET is_validated=%s, has_error=%s, can_be_loaded=%s WHERE id=%s' \ 
        %(is_validated, has_error, can_be_loaded, load_file_detail_id)) 

實際SQLAlchemy的會議上是相同的,這兩個腳本需要的時間相同數量到這一點。兩種腳本的驗證過程都是相同的,並且保存到數據庫也是一樣的,即第3.0節和第3.1節由兩個腳本共享。唯一的區別是多線程驗證。所以我想,也許有關於多線程和SQLAlchemy的東西,使得應用程序在線程模式下更慢?我是否以正確的方式實現了線程化功能?其中之一或線程在這種情況下不適用。建議歡迎。

+0

帶下劃線的變量看起來不太清楚。 你是如何檢索數據的? – eri

回答

1

您必須創建用於記錄的隊列並添加「記錄器」線程。所以你刪除鎖代碼必須更快。

還要在每個線程中創建數據庫連接,以便能夠並行獲取數據。

由於GIL,胎紋僅平行化C庫調用。

用於並行化python代碼您必須使用多處理。

我寫測試您,說明如何處理迭代:

def produce_data(amount=100000, invalid=1, valid=10): 
# produce_data = sql('request').getall() 
    import random 
    id = 0 
    data = [True]*valid + [False]*invalid 
    while id < amount: 
     id+=1 
     yield (id,random.choice(data)) 


def validate(row): 
    if row[1]: 
     time.sleep(0.001) #set valid sql request emulation. 
     return True 
    else: 
     time.sleep(0.001) #set invalid sql request emulation. 
     return False 



def single(): 
    for row in produce_data(): 
     validate(row) 

def targeted(): 
    import threading 
    for row in produce_data(): 
     threading.Thread(target=validate,args=(row,)) 

Uley = 50 

class Bee(object): 
     error=False 
     running = True 
     def __init__(self,queue,*args,**kwargs): 
      self.queue=queue #dont use any global variable! 
      # every bee must have unique db connection and session. 
      #self.session = db.connection().session() 
      # initialize it there. 
      return super(Bee,self).__init__(*args,**kwargs) 

     def run(self): 
      while self.running: 
       data=self.queue.get() 
       if data: 
        self.error = validate(data) # refactor it to self.validate(data) to be able to get cursor from self.session. 
        self.queue.task_done() 
       else: 
        self.queue.task_done() 
        break 

      #self.session.commit()     


def treaded(): 
    import threading,Queue 

    class TreadedBee(Bee,threading.Thread): pass 

    q = Queue.Queue() 

    for i in range(Uley): #bees started before data was provided. 
     bee=TreadedBee(q) 
     bee.daemon = True 
     bee.start() 

    for row in produce_data(): #you dont need to get all data to begin processing, at this place must be cursor of response. 
     q.put(row) 

    q.join() 
    for i in range(Uley): 
     q.put(None) 


def forked(): 
    from multiprocessing import Process,JoinableQueue 
    class ForkedBee(Bee,Process): pass 

    q = JoinableQueue() 
    for i in range(Uley): 
     bee=ForkedBee(q) 
     bee.start() 

    for row in produce_data(): 
     q.put(row) 

    q.join() 
    #at this you need to kill zomBee -) 
    for i in range(Uley): 
     q.put(None) 
    q.close() 

def pool(): 
    from multiprocessing import Pool 
    pool = Pool(processes=Uley) 
    pool.map(validate,produce_data()) 

if __name__ == "__main__": 
    import time 
    s=time.time() 
    single() 
    print(time.time()-s) #109 
    s=time.time() 
    single() 
    print(time.time()-s) #6 
    s=time.time() 
    treaded() 
    print(time.time()-s) #12 
    s=time.time() 
    forked() 
    print(time.time()-s) #6 
    s=time.time() 
    pool() 
    print(time.time()-s) #4 

測試結果:

$ python2 tgreads.py 
109.779700994 
5.84457302094 
12.3814198971 
5.97618508339 
3.69856286049 

targeted將淹沒CPU,內存,你不能提供單個連接DB,使用共享連接是不安全的。如果想要這樣 - 你需要提供輸出隊列並實現收集器,這將與數據庫通信。 pool是短代碼和最快的,但不是很友好,以啓動每個工作人員的連接。

+0

嗯。新手提醒!介紹一下你在這裏陳述的基本所有內容的例子!我不完全理解你的第一點。第二點很熟悉,但就我而言,您會從哪裏啓動SQLAlchemy會話?出現此問題是因爲在驗證開始之前會話即與數據庫的連接已經存在。介意根據我的代碼給出一個例子?第三點和第四點,我已經看到了關於多處理的提及,但沒有人提供具體的例子。唯一的例子是queue.Queue,而不是multiprocessing.Queue。希望我不要問太多。 – lukik

+0

對不起,我的英語不好) 我現在不能提供示例。也許信或別人編輯我的答案。 如果以正確的方式編寫腳本,可以提高腳本的速度/ 多處理模塊中有隊列實現。 – eri

+0

你的英語已經足夠好,可以與其交流了!我祈禱有人會提供一些指導,因爲你提到,我相信如果我能做到這一點,我可以使用線程加速這個腳本。 – lukik