2011-06-29 132 views
8

我在python中遇到以下問題。寫入多處理文件

我需要做一些並行計算,其結果需要按順序寫入文件。所以,我創建了一個接收和multiprocessing.Queue文件句柄功能,做計算,並在文件中打印出結果:

import multiprocessing 
from multiprocessing import Process, Queue 
from mySimulation import doCalculation 

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file 

def work(queue, fh): 
while True: 
    try: 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
    except: 
     break 


if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fh = open("foo", "w") 
    workQueue = Queue() 
    parList = # list of conditions for which I want to run doCalculation() 
    for x in parList: 
     workQueue.put(x) 
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] 
    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 
    fh.close() 

但腳本運行後的文件最終空。我嘗試將worker()函數更改爲:

def work(queue, filename): 
while True: 
    try: 
     fh = open(filename, "a") 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
     fh.close() 
    except: 
     break 

並傳遞文件名作爲參數。然後它按我的意圖工作。當我嘗試按順序做同樣的事情時,沒有多處理,它也能正常工作。

爲什麼它沒有在第一個版本中工作?我看不到問題。

另外:我可以保證兩個進程不會同時寫入文件嗎?


編輯:

感謝。我現在明白了。這是工作版本:

import multiprocessing 
from multiprocessing import Process, Queue 
from time import sleep 
from random import uniform 

def doCalculation(par): 
    t = uniform(0,2) 
    sleep(t) 
    return par * par # just to simulate some calculation 

def feed(queue, parlist): 
    for par in parlist: 
      queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "dealing with ", par, "" 
      res = doCalculation(par) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 

if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fname = "foo" 
    workerQueue = Queue() 
    writerQueue = Queue() 
    parlist = [1,2,3,4,5,6,7,8,9,10] 
    feedProc = Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = Process(target = write, args = (writerQueue, fname)) 


    feedProc.start() 
    for p in calcProc: 
     p.start() 
    writProc.start() 

    feedProc.join() 
    for p in calcProc: 
     p.join() 
    writProc.join() 
+2

請關注。一組代碼**僅**。請刪除過時或不相關的代碼。請避免使用「編輯」。請讓問題清楚,完整和一致。 –

回答

16

你真的應該使用兩個隊列和三種不同的處理。

  1. 把東西放入隊列#1。

  2. 從隊列#1中獲取東西並進行計算,將東西放入隊列#2中。你可以有很多這樣的,因爲它們從一個隊列中獲得並安全地放入另一個隊列中。

  3. 從隊列#2中獲取內容並將其寫入文件。你必須正好有1個,不能再多了。它「擁有」該文件,保證原子訪問,並絕對保證該文件的編寫乾淨而一致。

+1

工作人員和消費者隊列+1。記得在隊列中設置一個最大尺寸,否則你的工作人員可能會吃掉你的記憶並使作者捱餓。 – Bittrance

+0

@ S.Lott @Btratrance請看看我的編輯。 –

+1

哦,從不關​​心多次運行......我很蠢,不會注意到我多次啓動了feedProc和writProc。 ¬¬我更正了代碼。但我仍然有一個空文件。 –

4

如果有人正在尋找一種簡單的方法來做同樣的事情,這可以幫助你。 我認爲這樣做沒有任何不利之處。如果有,請告訴我。

import multiprocessing 
import re 

def mp_worker(item): 
    # Do something 
    return item, count 

def mp_handler(): 
    cpus = multiprocessing.cpu_count() 
    p = multiprocessing.Pool(cpus) 
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step. 
    with open('ExampleFile.txt') as f: 
     listX = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, listX): 
      # (item, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 

來源:Python: Writing to a single file with queue while using multiprocessing Pool

0

有在寫代碼的工人犯錯,如果塊是假的,工人將永遠不會得到任何數據。應該如下:

par, res = queue.get(block = True) 

您可以通過 queueOut.put((par,res))

With塊=假,你會得到不斷增加隊列的長度,直到它後面添加一行

print "QSize",queueOut.qsize() 

檢查填滿,不像block = True,你總是得到「1」。

相關問題