2016-09-28 82 views
1

我有一個數據庫記錄集(大約1000行),我目前正在遍歷它們,爲每條記錄使用額外的數據庫查詢來集成更多的數據。Python在記錄集中的多線程

這樣做會將整體處理時間提高到大概100秒。

我想要做的是將功能分享到2-4個過程。

我正在使用Python 2.7來兼容AWS Lambda。

def handler(event, context): 

    try: 

     records = connection.get_users() 

     mandrill_client = open_mandrill_connection() 

     mandrill_messages = get_mandrill_messages() 

     mandrill_template = 'POINTS weekly-report-to-user' 

     start_time = time.time() 

     messages = build_messages(mandrill_messages, records) 

     print("OVERALL: %s seconds ---" % (time.time() - start_time)) 

     send_mandrill_message(mandrill_client, mandrill_template, messages) 

     connection.close_database_connection() 

     return "Process Completed" 

    except Exception as e: 

     print(e) 

以下是我希望投入線程功能:

def build_messages(messages, records): 

for record in records: 

    record = dict(record) 

    stream = get_user_stream(record) 

    data = compile_loyalty_stream(stream) 

    messages['to'].append({ 
     'email': record['email'], 
     'type': 'to' 
    }) 

    messages['merge_vars'].append({ 
     'rcpt': record['email'], 
     'vars': [ 
      { 
       'name': 'total_points', 
       'content': record['total_points'] 
      }, 
      { 
       'name': 'total_week', 
       'content': record['week_points'] 
      }, 
      { 
       'name': 'stream_greek', 
       'content': data['el'] 
      }, 
      { 
       'name': 'stream_english', 
       'content': data['en'] 
      } 
     ] 
    }) 

return messages 

我已經試過被導入多道庫:

from multiprocessing.pool import ThreadPool 

創造了嘗試內池塊並映射此池中的功能:

pool = ThreadPool(4) 

messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records)) 

def build_messages_in(a_b): 
    build_msg(*a_b) 


def build_msg(a, b): 
    return build_messages(a, b) 

def get_user_stream(record): 

    response = [] 

    i = 0 

    for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'], 
               record['points'], record['action_creation']): 

     information = get_reference(mod, mod_id) 

     if information: 

      response.append({ 
       'action': act, 
       'points': p, 
       'created': act_created, 
       'info': information 
      }) 

      if (act == 'invite_friend') \ 
        or (act == 'donate') \ 
        or (act == 'bonus_500_general') \ 
        or (act == 'bonus_1000_general') \ 
        or (act == 'bonus_500_cancel') \ 
        or (act == 'bonus_1000_cancel'): 

       response[i]['info']['date_ref'] = act_created 
       response[i]['info']['slug'] = 'attiki' 

      if (act == 'bonus_500_general') \ 
        or (act == 'bonus_1000_general') \ 
        or (act == 'bonus_500_cancel') \ 
        or (act == 'bonus_1000_cancel'): 

       response[i]['info']['title'] = '' 

      i += 1 

    return response 

最後,我從build_message函數中刪除了for循環。

我得到的結果是'NoneType'對象不可迭代。

這是這樣做的正確方法嗎?

+0

@GhostCat我提交了這個問題,忘了寫下我試過的東西。這裏有一些代碼可以工作,下面是一些應該可以工作的代碼,但是沒有。基本上我試圖多處理build_messages函數。 – mallix

+0

太好了。現在讓專家來幫助你;-) – GhostCat

+0

你還沒有說過你在哪裏得到這個錯誤。從我可以收集的信息來看,這是使用'mailchimp' API,我假設最長的等待時間是針對API響應的? – roganjosh

回答

2

您的代碼看起來非常深入,因此您不能確定multithreading在高級別應用時是否會帶來任何性能提升。因此,值得深入研究,讓您獲得最大的延遲並考慮如何解決特定的瓶頸問題。有關線程限制的更多討論,請參閱here

如果,例如我們在評論中討論,你可以找出是需要很長時間的單一任務,那麼你可以嘗試使用multiprocessing而不是並行化 - 利用更多的CPU處理能力。這裏是一個通用的例子,希望它足夠簡單,可以理解爲反映Postgres查詢而不需要進入自己的代碼庫;我認爲這是一個不可行的努力tbh。

import multiprocessing as mp 
import time 
import random 
import datetime as dt 

MAILCHIMP_RESPONSE = [x for x in range(1000)] 

def chunks(l, n): 
    n = max(1, n) 
    return [l[i:i + n] for i in range(0, len(l), n)] 


def db_query(): 
    ''' Delayed response from database ''' 
    time.sleep(0.01) 
    return random.random() 


def do_queries(query_list): 
    ''' The function that takes all your query ids and executes them 
    sequentially for each id ''' 
    results = [] 
    for item in query_list: 
     query = db_query() 
     # Your super-quick processing of the Postgres response 
     processing_result = query * 2 
     results.append([item, processing_result]) 
    return results 


def single_processing(): 
    ''' As you do now - equivalent to get_reference ''' 
    result_of_process = do_queries(MAILCHIMP_RESPONSE) 
    return result_of_process 


def multi_process(chunked_data, queue): 
    ''' Same as single_processing, except we put our results in queue rather 
    than returning them ''' 
    result_of_process = do_queries(chunked_data) 
    queue.put(result_of_process) 


def multiprocess_handler(): 
    ''' Divide and conquor on our db requests. We split the mailchimp response 
    into a series of chunks and fire our queries simultaneously. Thus, each 
    concurrent process has a smaller number of queries to make ''' 

    num_processes = 4 # depending on cores/resources 
    size_chunk = len(MAILCHIMP_RESPONSE)/num_processes 
    chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk) 

    queue = mp.Queue() # This is going to combine all the results 

    processes = [mp.Process(target=multi_process, 
       args=(chunked_queries[x], queue)) for x in range(num_processes)] 

    for p in processes: p.start() 

    divide_and_conquor_result = [] 
    for p in processes: 
     divide_and_conquor_result.extend(queue.get()) 

    return divide_and_conquor_result 


if __name__ == '__main__': 
    start_single = dt.datetime.now() 

    single_process = single_processing() 

    print "Single process took {}".format(dt.datetime.now() - start_single) 
    print "Number of records processed = {}".format(len(single_process)) 

    start_multi = dt.datetime.now() 

    multi = multiprocess_handler() 

    print "Multi process took {}".format(dt.datetime.now() - start_multi) 
    print "Number of records processed = {}".format(len(multi))