2017-09-14 59 views
0

我想更改當前使用的池中的工作人員數。 我現在的想法是如何更改移動中的多處理池工作人員數

while True: 
    current_connection_number = get_connection_number() 
    forced_break = False 
    with mp.Pool(current_connection_number) as p: 
     for data in p.imap_unordered(fun, some_infinite_generator): 
      yield data 
      if current_connection_number != get_connection_number(): 
       forced_break = True 
       break 
    if not forced_break: 
     break 

的問題是,它只是終止工等從some_infinite_generator得到了這一點,並沒有尚未處理都失去了最後的項目。有沒有一些標準的方式來做到這一點?

編輯:我已經嘗試打印some_infinite_generator裏面,事實證明,p.imap_unordered請求1565項目只有2池工作人員甚至在處理任何事情之前,我如何限制從發電機請求的項目數量?如果我使用上面的代碼並在2個項目後更改連接數,我將丟失1563個項目

回答

0

問題是Pool會在單獨的線程內部使用生成器。你無法控制該邏輯。

你可以做什麼,正在向Pool.imap_unordered方法提供一部分生成器,並在根據可用連接進行縮放之前消耗該部分。

CHUNKSIZE = 100 

while True: 
    current_connection_number = get_connection_number() 
    with mp.Pool(current_connection_number) as p: 
     while current_connection_number == get_connection_number(): 
      for data in p.imap_unordered(fun, grouper(CHUNKSIZE, some_infinite_generator)): 
       yield data 

def grouper(n, iterable): 
    it = iter(iterable) 
    while True: 
     chunk = tuple(itertools.islice(it, n)) 
     if not chunk: 
      return 
     yield chunk 

這有點不太理想的比例發生的每塊而不是每次迭代中卻帶着幾分的CHUNKSIZE值的微調,你可以很容易地得到它的權利。

grouper recipe

相關問題