2016-11-10 72 views
0

我試圖創建一個可以運行多個進程的異步函數,並將發送響應。由於multiprocessing.Process()不返回響應,我想創建一個函數爲:運行多個異步函數並獲取每個函數的返回值

from multiprocessing import Process 

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    response_list = [] 
    def worker(function, f_args, f_kwargs, response_list): 
     """ 
     Runs the function and appends the output to list 
     """ 
     response = function(*f_args, **f_kwargs) 
     response_list.append(response) 

    processes = [Process(target=worker, args=(func, args, kwargs, response_list)) \ 
        for func, args, kwargs in func_list] 

    for process in processes: 
     process.start() 
    for process in processes: 
     process.join() 
    return response_list 

在這個函數中,我稱之爲worker異步它接受額外的參數作爲list。因爲列表是作爲參考傳遞的,所以我想我可以在列表中附加實際函數的響應。並且async_call將返回所有函數的響應。

但這不是我期望的行爲方式。將值附加到的worker()內,但工作人員response_list列表保持空白。

任何想法我做錯了什麼?而且,還有什麼替代方案可以實現我的目標?

回答

1

您不能直接跨進程共享對象。您需要使用專門用於傳遞值的類Queue and Pipe;見the documentation

+0

明白了。我只能對'queue.put()'執行的''queue.get()'做'queue.put()'。對於更多的訪問項目,它會凍結。所以我正在迭代'func_list'的'len'。有沒有更好的方法來做到這一點? –

+0

有沒有辦法將該值與函數映射?作爲一個黑客,我可以想到用一個鍵作爲函數的索引返回字典對象,並返回鍵排序。有沒有其他更pythonic的方式來實現這一目標? –

0

Daniel's Answer所述,我們不能直接在進程之間共享對象。在這裏我使用multiprocessing.Queue()和更新功能:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

採樣運行:

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2]