2016-11-22 48 views
1

我有這個代碼(我很抱歉,它幾乎是從我的工作代碼中精確複製粘貼,我不知道問題可能在哪裏,因此我將整個在這裏):Python多處理代碼運行良好,但不終止

def init(Q): 
    """Serves to initialize the queue across all child processes""" 
    global q 
    q = Q 

def queue_manager(q): 
    """Listens on the queue, and writes pushed data to file""" 
    while True: 
     data = q.get() 
     if data is None: 
      break 
     key, preds = data 
     with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store: 
      out_store.append(key, preds) 

def writer(message): 
    """Pushes messages to queue""" 
    q.put(message) 

def reader(key): 
    """Reads data from store, selects required days, processes it""" 
    try: 
     # Read the data 
     with pd.HDFStore(hdf_in, mode='r') as in_store: 
      df = in_store[key] 
    except KeyError as ke: 
     # Almost guaranteed to not happen 
     return (key, pd.DataFrame()) 
    else: 
     # Executes only if exception is not raised 
     fit_df = df[(df.index >= '2016-09-11') & \ 
        (df.index < '2016-09-25') & \ 
        (df.index.dayofweek < 5)].copy() 
     pre_df = df[(df.index >= '2016-09-18') & \ 
        (df.index < '2016-10-2') & \ 
        (df.index.dayofweek < 5)].copy() 
     del df 
     # model_wrapper below is a custom function in another module. 
     # It works fine. 
     models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df) 
     if preds is not None: 
      writer((key, preds)) 
      del preds 
    return (key, models) 

def main(): 
    sensors = pd.read_csv('sens_metadata.csv', index_col=[0]) 
    nprocs = int(cpu_count() - 0) 
    maxproc = 10 
    q = Queue() 
    t = Thread(target=queue_manager, args=(q,)) 

    print("Starting process at\t{}".format(dt.now().time())) 
    sys.stdout.flush() 
    t.start() 
    with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init, 
       initargs=(q,)) as p: 
     models = p.map(reader, sensors.index.tolist(), 1) 
    print("Processing done at\t{}".format(dt.now().time())) 
    print("\nJoining Thread, and finishing writing predictions") 
    sys.stdout.flush() 
    q.put(None) 
    t.join() 
    print("Thread joined successfully at\t{}".format(dt.now().time())) 
    print("\nConcatenating models and serializing to pickle") 
    sys.stdout.flush() 
    pd.concat(dict(models)).to_pickle(path + 'models.pickle') 
    print("Pickled successfully at\t{}".format(dt.now().time())) 

if __name__ == '__main__': 
    main() 

此代碼的行爲就像一個嚴重偏差的硬幣折騰。大多數時候它不起作用,有時它起作用。運行時,我知道完成運行整個數據大約需要2.5小時(所有的keys)。在10次運行中,有9次會處理所有數據,我看到hdf_out文件中的數據,但多處理池未加入。所有的子進程都是活動的,但沒有做任何工作。我只是不明白爲什麼該程序可能會像這樣掛起。

發生這種情況時,我看不到"Processing done at ...""Joining Thread, ..."消息。另外,如果我給它較小的數據集,它會結束。如果我排除preds的計算,則結束。我不能排除計算models而不進行重大修改,這將不利於項目的其餘部分。

我不知道爲什麼會發生這種情況。我正在使用Linux(Kubuntu 16.04)。

回答

0

明顯丟掉了maxtaskperchild kwag解決了這個問題。爲什麼我不明白這一點。我想這與fork進程(Linux上的默認設置)和產生進程(Windows上的唯一選項)之間的區別有關。

隨着叉子過程maxtaskperchild顯然不是必需的,因爲沒有它,性能更好。我注意到通過減少maxtaskperchild改善了內存使用。內存不會被子進程佔用,而是從父進程共享。但是,當我不得不使用Windows時,maxtaskperchild是防止子進程膨脹的關鍵方法,特別是在運行帶有長任務列表的內存密集型任務時。

有人誰知道發生了什麼更好,請隨時編輯這個答案。

相關問題