2017-05-26 39 views
0

我運行了一些測試代碼,如下所示,以檢查在Linux中使用Pool和Process的性能。我正在使用Python 2.7。 multiprocessing.Pool的源代碼似乎顯示它正在使用multiprocessing.Process。然而,多處理.Pool花費很多時間和mem比相同的多處理。處理,我不明白這一點。爲什麼multiprocessing.Pool和multiprocessing.process在Linux中執行得如此不同

這裏是我做過什麼:

  1. 創建一個大字典,然後子進程。

  2. 將字典傳遞給每個子進程以讀取只讀。

  3. 每個子過程都做一些計算並返回一個小的結果。

下面是我的測試代碼:

from multiprocessing import Pool, Process, Queue 
import time, psutil, os, gc 

gct = time.time 
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET))) 

def getMemConsumption(): 
    procId = os.getpid() 
    proc = psutil.Process(procId) 
    mem = proc.memory_info().rss 
    return "process ID %d.\nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3) 

def f_pool(l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     # gc.collect() 
     print getMemConsumption() 
     return 1, result, jobID 
    except: 
     return 0, {}, jobID 

def f_proc(q, l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     print getMemConsumption() 
     q.put([1, result, jobID]) 
    except: 
     q.put([0, {}, jobID]) 

def initialSubProc(targetFunc, procArgs, jobID): 
    outQueue = Queue() 
    args = [outQueue] 
    args.extend(procArgs) 
    args.append(jobID) 
    p = Process(target = targetFunc, args = tuple(args)) 
    p.start() 
    return p, outQueue 


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
        maxJobs, targetFunc, procArgs, joinFlag, all_result): 
    if len(procList) < maxProcN: 
     p, q = initialSubProc(targetFunc, procArgs, jobCount) 
     outQueueList.append(q) 
     procList.append(p) 
     jobCount += 1 
     joinFlag.append(0) 
    else: 
     for i in xrange(len(procList)): 
      if not procList[i].is_alive() and joinFlag[i] == 0: 
       procList[i].join() 
       all_results.append(outQueueList[i].get()) 
       joinFlag[i] = 1 # in case of duplicating result of joined subprocess 
       if jobCount < maxJobs: 
        p, q = initialSubProc(targetFunc, procArgs, jobCount) 
        procList[i] = p 
        outQueueList[i] = q 
        jobCount += 1 
        joinFlag[i] = 0 
    return jobCount 

if __name__ == '__main__': 
    st = gct() 
    d = {i:i**2 for i in xrange(10000000)} 
    print "MainProcess create data dict\n%s" % getMemConsumption() 
    print 'Time to create dict: %s\n\n' % costTime(gct()-st) 

    nproc = 2 
    jobs = 8 
    subProcReturnDictLen = 1000 
    procArgs = [d, subProcReturnDictLen] 

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    pool = Pool(processes = nproc) 
    for i in xrange(jobs): 
     procArgs.append(i) 
     sp = pool.apply_async(f_pool, tuple(procArgs)) 
     procArgs.pop(2) 
     res = sp.get() 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    pool.close() 
    pool.join() 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption(), '\n' 

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    procList = [] 
    outQueueList = [] 
    all_results = [] 
    jobCount = 0 
    joinFlag = [] 
    while (jobCount < jobs): 
     jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
            jobs, f_proc, procArgs, joinFlag, all_results) 
    for i in xrange(nproc): 
     if joinFlag[i] == 0: 
      procList[i].join() 
      all_results.append(outQueueList[i].get()) 
      joinFlag[i] = 1 
    for i in xrange(jobs): 
     res = all_results[i] 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption() 

下面是結果:

MainProcess create data dict 
process ID 21256. 
Memory usage: 0.841743 GB 
Time to create dict: 00:00:02 


Use multiprocessing.Pool, max subprocess = 2, jobs = 8 

process ID 21266. 
Memory usage: 1.673084 GB 
process ID 21267. 
Memory usage: 1.673088 GB 
process ID 21266. 
Memory usage: 2.131172 GB 
process ID 21267. 
Memory usage: 2.131172 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 

Total time used to finish all jobs: 00:00:49 
Main Process 
process ID 21256. 
Memory usage: 0.843079 GB 


Use multiprocessing.Process, max subprocess = 2, jobs = 8 

process ID 23405. 
Memory usage: 0.840614 GB 
process ID 23408. 
Memory usage: 0.840618 GB 
process ID 23410. 
Memory usage: 0.840706 GB 
process ID 23412. 
Memory usage: 0.840805 GB 
process ID 23415. 
Memory usage: 0.840900 GB 
process ID 23417. 
Memory usage: 0.840973 GB 
process ID 23419. 
Memory usage: 0.841061 GB 
process ID 23421. 
Memory usage: 0.841152 GB 

Total time used to finish all jobs: 00:00:00 
Main Process 
process ID 21256. 
Memory usage: 0.843781 GB 

我不知道爲什麼從multiprocessing.Pool子過程需要約1.6GB的開始,但是來自多進程的子進程。進程僅需要0.84GB,這等於主進程的內存成本。在我看來,只有多進程。進程享有linux的「寫時複製」優勢,因爲所需的所有工作時間都小於1秒。我不知道爲什麼multiprocessing.Pool不喜歡這個。從源代碼來看,multiprocessing.Pool看起來像是多處理過程的包裝。

回答

0

問題:我不知道爲什麼從multiprocessing.Pool子過程需要約1.6GB之初,
...游泳池好像multiprocessing.Process

這一個包裝是,爲Pool預留內存爲所有工作的結果。
二,Pool使用兩個SimpleQueue()三個Threads
三,Pool重複全部通過argv數據,然後傳遞到process

process例如只使用一個Queue()所有,傳遞argv,因爲它們。

Pool遠遠只是一個包裝。

+0

謝謝。現在使用兩個隊列來反轉結果和輸入是有意義的。只是一個小修正,'Pool'使用'multiprocessing.Queue'而不是'multiprocessing.Manager()。Queue'。 – Finix

相關問題