2013-10-22 78 views
1

我試圖用ipython的並行處理並行處理數據。我按照@minrk的說明回答how to get intermidiate results in ipython parallel processing?的問題。由於數據是異構的,一些處理任務比其他處理任務更早完成,我希望在它們可用時立即保存它們。我這樣做以下列方式:處理來自asyncmap的結果

for i in asyncmap: 
    print i 

麻煩的是,我的代碼,有時會拋出異常(上面的例子:

from IPython.parallel import Client 

def specialfunc(param): 
    import time 
    if param > 8: 
     raise IOError 
    else: 
     time.sleep(param) 
     return param 

client = Client() 
balanced  = client.load_balanced_view() 
balanced.block = False 
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False) 

當他們準備好然後我就可以遍歷asyncmap和結果變得可用當調用參數超過8時強制IOError)我想處理。但是,只要其中一臺發動機發生搖晃,整個異步映射「似乎」即將完成。

我實際上注意到,當我詢問asyncmap.metadata可以很好地找出哪個消息給出了錯誤(asyncmap.metadata [i] ['pyerr']),但是我不知道如何等待結果像他們一樣進來。

所以我的問題是我應該如何處理我的結果異步到達我的引擎,即使他們有時會拋出異常。如何在引擎中捕捉異常而不會影響控制器中的等待結果?

回答

0

通過ipython/*/examples/parallel/customresults.py啓發,我想出了這個解決方案:

asyncmap = balanced.map(specialfunc, param_list, ordered=False) 

#create original mapping of msg_ids to parameters 
# maybe just a quick way to find which parameter gave what result 
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list)) 

pending = set(asyncmap.msg_ids) # all queued jobs are pending 
while pending: # we'll come back as long as finished jobs haven't been looked at yet 
    try: 
     client.wait(pending, 1e-3) 
    except parallel.TimeoutError: 
     # ignore timeouterrors, since they only mean that at least one isn't done 
     pass 

    # finished is the set of msg_ids that are complete 
    finished = pending.difference(client.outstanding) 
    # update pending to exclude those that just finished 
    pending = pending.difference(finished) 
    for msg_id in finished: 
     # we know these are done, so don't worry about blocking 
     ar = client.get_result(msg_id) 
     # checking whether any exceptions occurred when code ran on the engine 
     if ar.metadata['pyerr'] is None: 
      print "job id %s finished on engine %i " % (msg_id, ar.engine_id) 
      print "and results for parameter %i :" % msg_ids_to_parameters[msg_id] 
      # note that each job in a map always returns a list of length chunksize 
      # even if chunksize == 1 
      for res in ar.result: 
       print " item %i \n" % res 
     else: 
      print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr'])) 

基本上從示例代碼的變化是看的元數據,看看是否有錯誤已被記錄且僅當不繼續前進並通過ar.result檢索結果。

+0

BTW:'load_balanced_view'的'map'方法是否帶有'ordered'關鍵字。我以爲'map'是按照定義排序的。 –

1

我知道這聽起來有些愚蠢,但你可以返回一個特殊的值來表示錯誤,比如說-1None或者一個字符串。爲了避開map_async我所做的是循環遍歷參數並使用apply_async,將結果存儲在列表中。然後,我遍歷列表試圖獲取結果並逐個處理它們。看起來是這樣的:

n_cores = len(c.ids) 
for n,p in enumerate(params): 
    core = c.ids[n%n_cores] 
    calls.append(c[core].apply_async(f, p)) 

    #then you get the results 

while calls != []: 
     for c in calls: 
      try: 
       result = c.get(1e-3) 
       process(result) 
       calls.remove(c) 
       #in the case your call failed, you can apply_async again. 
       # and append the call to calls. 
      except parallel.TimeoutError: 
       pass 

或者選擇使用c[core].apply()c.ready()檢查調用。基本上同樣的事情沒有例外處理。令人討厭的是這佔用了很多內存,因爲每個通話的results和其他dict都很難清除。

我正在做類似的事情here,我決定map_async只是沒有爲我工作。 This也可能是相關的,以防你決定採用這種方法。

乾杯。

PS:我認爲這基本上就是你在上面實現的,但是我覺得更自然地分別處理這些調用,然後將它們堆疊到地圖中,特別是如果以後可能要重新處理其中的一些。