2012-06-07 50 views
0

我對下面的代碼有一個很奇怪的問題。當numrows = 10 Process循環自行完成並繼續完成。如果越來越大的清單變得越來越大,就會陷入僵局。爲什麼是這樣的,我該如何解決這個問題?在大循環中多處理,寫入文件和死鎖

import multiprocessing, time, sys 

# ----------------- Calculation Engine ------------------- 
def feed(queue, parlist): 
    for par in parlist: 
     queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "Project ID: %s started. " % par 
      res = doCalculation(par) 
      queueOut.put(res) 

     except: 
      break 

def write(queue, fname): 
    print 'Started to write to file' 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      res = queue.get(block = False) 
      for m in res: 
       print >>fhandle, m 
     except: 
      break 
    fhandle.close() 
    print 'Complete writing to the file' 


def doCalculation(project_ID): 
    numrows = 100 
    toFileRowList = [] 

    for i in range(numrows): 
     toFileRowList.append([project_ID]*100) 
     print "%s %s" % (multiprocessing.current_process().name, i) 

    return toFileRowList 


def main(): 
    parlist  = [276, 266] 

    nthreads = multiprocessing.cpu_count() 
    workerQueue = multiprocessing.Queue() 
    writerQueue = multiprocessing.Queue() 

    feedProc = multiprocessing.Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [multiprocessing.Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = multiprocessing.Process(target = write, args = (writerQueue, 'somefile.csv')) 

    feedProc.start() 
    feedProc.join() 

    for p in calcProc: 
     p.start() 
    for p in calcProc: 
     p.join() 

    writProc.start() 
    writProc.join() 

if __name__=='__main__': 
    sys.exit(main()) 

回答

1

我認爲問題是隊列緩衝區被填充,所以你需要從隊列中讀取,然後纔可以添加其他的東西。 例如,在你的feed線程您有:

queue.put(par) 

如果你一直把太多的東西不讀這將導致它,直到緩衝區塊被釋放,但問題是,你只釋放緩衝區你calc線程,在加入阻塞線程之前,線程又不會啓動。

因此,爲了您的feed線程完成,緩衝區應該被釋放,但緩衝區將不會在線程結束:)

之前被釋放,嘗試舉辦隊列訪問更多。

+0

的確如此。看來緩衝區是問題所在。必須找到解決方法。謝謝。 –

1

feedProc和writeProc實際上並沒有與其他程序並行運行。當你有

proc.start() 
proc.join() 

啓動過程,然後,在join()你立刻等待它完成。在這種情況下,多處理沒有收益,只有開銷。嘗試在加入它們之前立即啓動所有流程。這也會影響你的隊列清空regularyl,你不會陷入僵局。

+0

不幸的是,在連接之上加入'writProc.start()'並沒有幫助。 –

+0

第一啓動所有的處理在calcProc加入其中的任何 'feedProc.start() 對於p之前: p.start() writProc.start() feedProc.join() 對於p在calcProc: p .join() writProc.join()' –