2015-11-30 134 views
0

我想並行調用這個函數[1]。因此,我創建了這個函數[2],我把它稱爲[3] [4]。問題是,當我執行這個代碼時,執行掛起,我從來沒有看到結果,但如果我連續執行run_simple_job,一切都會好起來的。爲什麼我不能並行執行此功能?對此有何建議?如何並行執行一個函數?

[1],我試圖調用函數

@make_verbose 
def run_simple_job(job_params): 
    """ 
    Execute a job remotely, and get the digests. 
    The output will come as a json file and it contains info about the input and output path, and the generated digest. 

    :param job_params: (namedtuple) contains several attributes important for the job during execution. 

     client_id (string) id of the client. 
     command (string) command to execute the job 
     cluster (string) where the job will run 
     task_type (TypeTask) contains information about the job that will run 
     should_tamper (Boolean) Tells if this job should tamper the digests or not 
:return : output (string) the output of the job execution 

""" 
client_id = job_params.client_id 
_command = job_params.command 
cluster = job_params.cluster 
task_type = job_params.task_type 

output = // execute job 

return output 

[2]函數,在平行

def spawn(f): 
    # 1 - how the pipe and x attributes end up here? 
    def fun(pipe, x): 
    pipe.send(f(x)) 
    pipe.close() 

    return fun 

def parmap2(f, X): 
    pipe = [Pipe() for x in X] 
    # 2 - what is happening with the tuples (c,x) and (p, c)? 
    proc = [Process(target=spawn(f), args=(c, x)) 
     for x, (p, c) in izip(X, pipe)] 

    for p in proc: 
    logging.debug("Spawn") 
    p.start() 
    for p in proc: 
    logging.debug("Joining") 
    p.join() 
    return [p.recv() for (p, c) in pipe] 

[3]包裝類

class RunSimpleJobWrapper: 
    """ Wrapper used when running a job """ 

    def __init__(self, params): 
    self.params = params 

[4調用]我怎麼稱呼功能並行運行

for cluster in clusters: 
    task_type = task_type_by_cluster[cluster] 

run_wrapper_list.append(RunSimpleJobWrapper(get_job_parameter(client_id, cluster, job.command, majority(FAULTS), task_type))) 

jobs_output = parmap2(run_simple_job_wrapper, run_wrapper_list) 

回答

1

你可以簡單地使用multiprocessing

from multiprocessing import Pool 
n_jobs = -1 # use all the available CPUs 
pool = Pool(n_jobs) 

param_list = [...] # generate a list of your parameters 


results = pool.map(run_simple_job,param_list) 
相關問題