2013-12-13 38 views
11

我正在嘗試將multiprocessing添加到某些代碼中,這些代碼的功能無法修改。我想以異步方式將這些函數作爲作業提交給多處理池。我正在做一些很像代碼here的代碼。但是,我不確定如何跟蹤結果。如何知道返回的結果對應哪個應用函數?如何跟蹤從多處理池返回的異步結果

需要強調的重要一點是,我無法修改現有功能(其他情況依賴於它們保持原樣),並且可以按照與函數作業的應用順序不同的順序返回結果游泳池。

感謝您的任何想法!

編輯:一些嘗試代碼如下:

import multiprocessing 
from multiprocessing import Pool 
import os 
import signal 
import time 
import inspect 

def multiply(multiplicand1=0, multiplicand2=0): 
    return multiplicand1*multiplicand2 

def workFunctionTest(**kwargs): 
    time.sleep(3) 
    return kwargs 

def printHR(object): 
    """ 
    This function prints a specified object in a human readable way. 
    """ 
    # dictionary 
    if isinstance(object, dict): 
     for key, value in sorted(object.items()): 
      print u'{a1}: {a2}'.format(a1=key, a2=value) 
    # list or tuple 
    elif isinstance(object, list) or isinstance(object, tuple): 
     for element in object: 
      print element 
    # other 
    else: 
     print object 

class Job(object): 
    def __init__(
     self, 
     workFunction=workFunctionTest, 
     workFunctionKeywordArguments={'testString': "hello world"}, 
     workFunctionTimeout=1, 
     naturalLanguageString=None, 
     classInstance=None, 
     resultGetter=None, 
     result=None 
     ): 
     self.workFunction=workFunction 
     self.workFunctionKeywordArguments=workFunctionKeywordArguments 
     self.workFunctionTimeout=workFunctionTimeout 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.resultGetter=resultGetter 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

class JobGroup(object): 
    """ 
    This class acts as a container for jobs. The data attribute jobs is a list of job objects. 
    """ 
    def __init__(
     self, 
     jobs=None, 
     naturalLanguageString="null", 
     classInstance=None, 
     result=None 
     ): 
     self.jobs=jobs 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

def initialise_processes(): 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 

def execute(
     jobObject=None, 
     numberOfProcesses=multiprocessing.cpu_count() 
     ): 
     # Determine the current function name. 
    functionName=str(inspect.stack()[0][3]) 
    def collateResults(result): 
     """ 
     This is a process pool callback function which collates a list of results returned. 
     """ 
     # Determine the caller function name. 
     functionName=str(inspect.stack()[1][3]) 
     print("{a1}: result: {a2}".format(a1=functionName, a2=result)) 
     results.append(result) 
    def getResults(job): 
     # Determine the current function name. 
     functionName=str(inspect.stack()[0][3]) 
     while True: 
      try: 
       result=job.resultGetter.get(job.workFunctionTimeout) 
       break 
      except multiprocessing.TimeoutError: 
       print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description())) 
     #job.result=result 
     return result 
    # Create a process pool. 
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes) 
    print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses))) 
    # Unpack the input job object and submit it to the process pool. 
    print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject)) 
    if isinstance(jobObject, Job): 
     # If the input job object is a job, apply it to the pool with its associated timeout specification. 
     # Return a list of results. 
     job=jobObject 
     print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
     # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
     job.resultGetter=pool1.apply_async(
       func=job.workFunction, 
       kwds=job.workFunctionKeywordArguments 
     ) 
     # Get results. 
     # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
     print("{a1}: getting results for job...".format(a1=functionName)) 
     job.result=getResults(job) 
     print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
     print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
     # Return the job result from execute. 
     return job.result 
     pool1.terminate() 
     pool1.join() 
    elif isinstance(jobObject, JobGroup): 
     # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification. 
     for job in jobObject.jobs: 
      print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
      # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
      job.resultGetter=pool1.apply_async(
        func=job.workFunction, 
        kwds=job.workFunctionKeywordArguments 
      ) 
     # Get results. 
     # Cycle through each job and and append the result for the job to a list of results. 
     results=[] 
     for job in jobObject.jobs: 
      # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
      print("{a1}: getting results for job...".format(a1=functionName)) 
      job.result=getResults(job) 
      print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
      #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
      # Collate the results. 
      results.append(job.result) 
     # Apply the list of results to the job group data attribute results. 
     jobObject.results=results 
     print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results)) 
     # Return the job result list from execute. 
     return jobObject.results 
     pool1.terminate() 
     pool1.join() 
    else: 
     # invalid input object 
     print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject)) 

def main(): 
    print('-'*80) 
    print("MULTIPROCESSING SYSTEM DEMONSTRATION\n") 

    # Create a job. 
    print("# creating a job...\n") 
    job1=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=4 
    ) 
    print("- printout of new job object:") 
    job1.printout() 
    print("\n- printout of new job object in logging format:") 
    print job1.description() 

    # Create another job. 
    print("\n# creating another job...\n") 
    job2=Job(
      workFunction=multiply, 
      workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3}, 
      workFunctionTimeout=6 
    ) 
    print("- printout of new job object:") 
    job2.printout() 
    print("\n- printout of new job object in logging format:") 
    print job2.description() 

    # Create a JobGroup object. 
    print("\n# creating a job group (of jobs 1 and 2)...\n") 
    jobGroup1=JobGroup(
      jobs=[job1, job2], 
    ) 
    print("- printout of new job group object:") 
    jobGroup1.printout() 
    print("\n- printout of new job group object in logging format:") 
    print jobGroup1.description() 

    # Submit the job group. 
    print("\nready to submit job group") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(jobGroup1) 

    response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n") 

    # Demonstrate timeout. 
    print("\n # creating a new job in order to demonstrate timeout functionality...\n") 
    job3=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=1 
    ) 
    print("- printout of new job object:") 
    job3.printout() 
    print("\n- printout of new job object in logging format:") 
    print job3.description() 
    print("\nNote the timeout specification of only 1 second.") 

    # Submit the job. 
    print("\nready to submit job") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(job3) 

    response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.") 
    print('-'*80) 

if __name__ == '__main__': 
    main() 

編輯:這個問題已經被置於[暫停]以下陳述的原因:

「的問題詢問碼必須表明對所解決問題的最低限度理解,包括嘗試解決方案,爲什麼他們不工作,以及預期結果。另請參閱:Stack Overflow question checklist

這個問題不是請求代碼;它正在請求思想,一般指導。展示了對所考慮問題的最小理解(注意術語「多處理」,「池」和「異步」的正確使用以及注意事項the reference to prior code)。關於嘗試的解決方案,我承認嘗試解決方案的努力將會是有益的。我現在添加了這樣的代碼。我希望我已經解決了導致[擱置]地位的問題。

回答

14

沒有看到實際的代碼,我只能回答一般性問題。但是有兩個通用的解決方案。

首先,不是使用callback而是忽略AsyncResult,將它們存儲在某種集合中。那麼你可以使用該集合。例如,如果您希望能夠查找結果爲使用該功能鍵的功能,只需創建一個功能鍵一dict

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     results[func] = pool.apply_async(func) 
    pool.close() 
    pool.join() 
    return {func: result.get() for func, result in results.items()} 

或者,您可以更改回調函數存儲在按鍵收集結果。例如:

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     def callback(result, func=func): 
      results[func] = result 
     pool.apply_async(func, callback=callback) 
    pool.close() 
    pool.join() 
    return results 

我使用的功能本身作爲重點。但是你想要使用索引,這很簡單。你有任何價值,你都可以用作鑰匙。


同時,你鏈接的例子其實只是呼籲一堆參數相同的功能,等待它們全部完成,並以任意順序離開導致一些迭代。這正是imap_unordered所做的,但更簡單。你可以用這個代替從鏈接的代碼整個複雜的事情:

pool = mp.Pool() 
results = list(pool.imap_unordered(foo_pool, range(10))) 
pool.close() 
pool.join() 

然後,如果你想在原來的順序,而不是以任意順序的結果,你可以切換到imapmap代替。所以:

pool = mp.Pool() 
results = pool.map(foo_pool, range(10)) 
pool.close() 
pool.join() 

如果您需要類似的,但過於複雜,融入map範式的東西,concurrent.futures將可能使你的生活比multiprocessing容易。如果您使用Python 2.x,則必須安裝the backport。但你可以做的事情,是更難做AsyncResult S或callback S(或map),例如撰寫了一大堆期貨成一個大的未來。請參閱鏈接文檔中的示例。


最後一個音符:

強調的是,我不能修改現有的功能,最重要的點...

如果您不能修改功能,您可以隨時包裹它。例如,假設我有一個函數可以返回數字的平方,但是我試圖以異步方式爲它們的正方形構建一個dict映射數字,所以我需要將原始數字作爲結果的一部分。這很簡單:

def number_and_square(x): 
    return x, square(x) 

而現在,我可以只是apply_async(number_and_square)而不僅僅是square,得到我想要的結果。

我沒有這樣做,在上面的例子中,因爲在第一種情況下我存儲在鑰匙插入從呼叫方收集,並在第二個地方,我必將成回調函數。但結合它繞成函數的包裝是一樣任一一樣簡單,而當這些都不是可以適當。

+0

非常感謝您對您的明確建議和指導。如你所說,我用函數包裝和您的使用功能作爲結果的字典中的關鍵的伎倆試驗。 ['''concurrent.futures'''(http://docs.python.org/3/library/concurrent.futures.html)看起來很有希望,我會盡快展開調查,太。再次感謝。 – d3pd

+2

感謝您的回答。我有一個非常類似的問題沒有解決。如果您需要能夠在每個任務上設置超時並想知道哪些輸入導致超時,該怎麼辦? – chrishiestand