2013-09-28 295 views
1

我試圖在蟒蛇mutliprocessing實現這個tutorial,但是當我試圖做我自己的任務,我得到以下錯誤:Python的多IO錯誤:[錯誤232]管道被關閉

Traceback (most recent call last): 
>>> File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed 
    send(obj) 
IOError: [Errno 232] The pipe is being closed 

這裏是什麼,我試圖做這給同樣的錯誤信息重複的例子:

from multiprocessing import Lock, Process, Queue, current_process 
import time 

class Testclass(object): 
    def __init__(self, x): 
     self.x = x 

def toyfunction(testclass): 
    testclass.product = testclass.x * testclass.x 
    return testclass 


def worker(work_queue, done_queue): 
    try: 
     for testclass in iter(work_queue.get, 'STOP'): 
      print(testclass.counter) 
      newtestclass = toyfunction(testclass) 
      done_queue.put(newtestclass) 

    except: 
     print('error') 

    return True 

def main(): 

    counter = 1 

    database = [] 
    while counter <= 1000: 
     database.append(Testclass(3)) 
     counter += 1 
     print(counter) 



    workers = 8 
    work_queue = Queue() 
    done_queue = Queue() 
    processes = [] 

    start = time.clock() 
    counter = 1 

    for testclass in database: 
     testclass.counter = counter 
     work_queue.put(testclass) 
     counter += 1 
     print(counter) 


    print('items loaded') 
    for w in range(workers): 
     p = Process(target=worker, args=(work_queue, done_queue)) 
     p.start() 
     processes.append(p) 
     work_queue.put('STOP') 

    for p in processes: 
     p.join() 

    done_queue.put('STOP') 

    print(time.clock()-start) 
    print("Done") 

if __name__ == '__main__': 
    main()  
+0

我運行你的代碼,它工作正常。它最終打印完成。 –

+0

也爲這個'testclass.product = x * x'你需要使用'self.x',否則它會拋出異常。 –

回答

0

當我添加代碼,處理完成的隊列,我不再得到的錯誤。這裏是工作代碼:

from multiprocessing import Lock, Process, Queue, current_process 
import time 

class Testclass(object): 
    def __init__(self, x): 
     self.x = x 

def toyfunction(testclass): 
    testclass.product = testclass.x * testclass.x 
    return testclass 


def worker(work_queue, done_queue): 
    try: 
     for testclass in iter(work_queue.get, 'STOP'): 
      print(testclass.counter) 
      newtestclass = toyfunction(testclass) 
      done_queue.put(newtestclass) 

    except: 
     print('error') 

    return True 

def main(): 

    counter = 1 

    database = [] 
    while counter <= 100: 
     database.append(Testclass(10)) 
     counter += 1 
     print(counter) 



    workers = 8 
    work_queue = Queue() 
    done_queue = Queue() 
    processes = [] 

    start = time.clock() 
    counter = 1 

    for testclass in database: 
     testclass.counter = counter 
     work_queue.put(testclass) 
     counter += 1 
     print(counter) 


    print('items loaded') 
    for w in range(workers): 
     p = Process(target=worker, args=(work_queue, done_queue)) 
     p.start() 
     processes.append(p) 
     work_queue.put('STOP') 

    for p in processes: 
     p.join() 

    done_queue.put('STOP') 

    newdatabase = [] 
    for testclass in iter(done_queue.get, 'STOP'): 
     newdatabase.append(testclass) 

    print(time.clock()-start) 
    print("Done") 
    return(newdatabase) 

if __name__ == '__main__': 
    database = main() 
1

我身邊這讓使用事件正常退出過程後清空隊列:

self.event.set() #the process has a timer that checks for this to be set, then shuts itself down while not self._q.empty(): #_q is a multiprocess.Queue object used to communicate inter-process try: self._q.get(timeout=0.001) except: pass self._q.close()