2

我正在嘗試使用this example作爲我的cherrypy應用程序上排隊系統的模板。多重處理在Ubuntu中運行,不在Windows中

我能夠將它從python 2轉換爲python 3(將from Queue import Empty更改爲from queue import Empty)並在Ubuntu中執行它。但是,當我在Windows中執行它,我得到以下錯誤:

F:\workspace\test>python test.py 
Traceback (most recent call last): 
    File "test.py", line 112, in <module> 
    broker.start() 
    File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start 
    self._popen = self._Popen(self) 
    File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen 
    return _default_context.get_context().Process._Popen(process_obj) 
    File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen 
    return Popen(process_obj) 
    File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ 
    reduction.dump(process_obj, to_child) 
    File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump 
    ForkingPickler(file, protocol).dump(obj) 
TypeError: cannot serialize '_io.TextIOWrapper' object 

F:\workspace\test>Traceback (most recent call last): 
    File "<string>", line 1, in <module> 
    File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main 
    new_handle = steal_handle(parent_pid, pipe_handle) 
    File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle 
    _winapi.PROCESS_DUP_HANDLE, False, source_pid) 
OSError: [WinError 87] The parameter is incorrect 

下面是完整的代碼:

# from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html 

import sys 
import logging 
from logging import handlers 

from cherrypy.process import wspbus 

class MyBus(wspbus.Bus): 
    def __init__(self, name=""): 
     wspbus.Bus.__init__(self) 
     self.open_logger(name) 
     self.subscribe("log", self._log) 

    def exit(self): 
     wspbus.Bus.exit(self) 
     self.close_logger() 

    def open_logger(self, name=""): 
     logger = logging.getLogger(name) 
     logger.setLevel(logging.INFO) 
     h = logging.StreamHandler(sys.stdout) 
     h.setLevel(logging.INFO) 
     h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s")) 
     logger.addHandler(h) 

     self.logger = logger 

    def close_logger(self): 
     for handler in self.logger.handlers: 
      handler.flush() 
      handler.close() 

    def _log(self, msg="", level=logging.INFO): 
     self.logger.log(level, msg) 



import random 
import string 
from multiprocessing import Process 

class Bank(object): 
    def __init__(self, queue): 
     self.bus = MyBus(Bank.__name__) 
     self.queue = queue 
     self.bus.subscribe("main", self.randomly_place_order) 
     self.bus.subscribe("exit", self.terminate) 

    def randomly_place_order(self): 
     order = random.sample(['BUY', 'SELL'], 1)[0] 
     code = random.sample(string.ascii_uppercase, 4) 
     amount = random.randint(0, 100) 

     message = "%s %s %d" % (order, ''.join(code), amount) 

     self.bus.log("Placing order: %s" % message) 

     self.queue.put(message) 

    def run(self): 
     self.bus.start() 
     self.bus.block(interval=0.01) 

    def terminate(self): 
     self.bus.unsubscribe("main", self.randomly_place_order) 
     self.bus.unsubscribe("exit", self.terminate) 


from queue import Empty 

class Broker(Process): 
    def __init__(self, queue): 
     Process.__init__(self) 
     self.queue = queue 
     self.bus = MyBus(Broker.__name__) 
     self.bus.subscribe("main", self.check) 

    def check(self): 
     try: 
      message = self.queue.get_nowait() 
     except Empty: 
      return 

     if message == "stop": 
      self.bus.unsubscribe("main", self.check) 
      self.bus.exit() 
     elif message.startswith("BUY"): 
      self.buy(*message.split(' ', 2)[1:]) 
     elif message.startswith("SELL"): 
      self.sell(*message.split(' ', 2)[1:]) 

    def run(self): 
     self.bus.start() 
     self.bus.block(interval=0.01) 

    def stop(self): 
     self.queue.put("stop") 

    def buy(self, code, amount): 
     self.bus.log("BUY order placed for %s %s" % (amount, code)) 

    def sell(self, code, amount): 
     self.bus.log("SELL order placed for %s %s" % (amount, code)) 




if __name__ == '__main__': 
    from multiprocessing import Queue 
    queue = Queue() 

    broker = Broker(queue) 
    broker.start() 

    bank = Bank(queue) 
    bank.run() 

回答

6

的問題是,MyBus對象的部分不picklable,和你將實例MyBus保存到您的Broker實例中。因爲Windows缺少fork()支持,所以當您撥打broker.start()時,必須在子進程中對broker的整個狀態進行酸洗並重新創建,multiprocessing會生成以執行broker.run。它適用於Linux,因爲Linux支持fork;在這種情況下,它不需要醃製任何東西 - 子進程只要分叉就包含父進程的完整狀態。

有兩種方法來解決這個問題。第一個也是更困難的方法是讓您的broker實例可供選擇。要做到這一點,你需要製作MyBus。您現在得到的錯誤是指MyBus上的logger屬性,該屬性不可選。那個很容易解決;只需將__getstate__/__setstate__方法添加到MyBus,這些方法用於控制對象如何醃漬/取消挑選。如果我們刪除記錄時,我們味酸,並重新創建它,當我們unpickle,我們會解決此問題:

class MyBus(wspbus.Bus): 
    ... 
    def __getstate__(self): 
     self_dict = self.__dict__ 
     del self_dict['logger'] 
     return self_dict 

    def __setstate__(self, d): 
     self.__dict__.update(d) 
     self.open_logger() 

這工作,但後來我們打另一酸洗錯誤:

Traceback (most recent call last): 
    File "async2.py", line 121, in <module> 
    broker.start() 
    File "C:\python34\lib\multiprocessing\process.py", line 105, in start 
    self._popen = self._Popen(self) 
    File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen 
    return _default_context.get_context().Process._Popen(process_obj) 
    File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen 
    return Popen(process_obj) 
    File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ 
    reduction.dump(process_obj, to_child) 
    File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump 
    ForkingPickler(file, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <class 'cherrypy.process.wspbus._StateEnum.State'>: attribute lookup State on cherrypy.process.wspbus failed 

原來cherrypy.process.wspbus._StateEnum.State,這是由MyBus繼承wspbus.Bus類中的屬性,是一個嵌套類,和嵌套類不能進行酸洗:

class _StateEnum(object): 
    class State(object): 
     name = None 
     def __repr__(self): 
      return "states.%s" % self.name 

State對象(意外)用於跟蹤Bus實例的狀態。由於我們在啓動公共汽車之前正在進行酸洗,所以我們可以在我們醃製時從對象中刪除state屬性,並在我們取消拆卸時將其設置爲States.STOPPED。

class MyBus(wspbus.Bus): 
    def __init__(self, name=""): 
     wspbus.Bus.__init__(self) 
     self.open_logger(name) 
     self.subscribe("log", self._log) 

    def __getstate__(self): 
     self_dict = self.__dict__ 
     del self_dict['logger'] 
     del self_dict['state'] 
     return self_dict 

    def __setstate__(self, d): 
     self.__dict__.update(d) 
     self.open_logger() 
     self.state = wspbus.states.STOPPED # Initialize to STOPPED 

通過這些更改,代碼將按預期工作!唯一的限制是,如果公共汽車還沒有啓動,那麼只有醃製MyBus纔是安全的,這對您的用例來說很好。

再次,這是困難的方式。簡單的方法是刪除需要醃製MyBus實例。你可以只在子進程創建MyBus實例,而不是父:

class Broker(Process): 
    def __init__(self, queue): 
     Process.__init__(self) 
     self.queue = queue 

... 
    def run(self): 
     self.bus = MyBus(Broker.__name__) # Create the instance here, in the child 
     self.bus.subscribe("main", self.check) 
     self.bus.start() 
     self.bus.block(interval=0.01) 

只要你不需要訪問broker.bus在父,這是簡單的選擇。

+0

哇,我印象深刻!很好的解釋。 – stenci

+0

嗯...在Windows中缺少'fork()'也意味着爲了在一個擁有數百個類的大型Web應用程序上運行一個小類的小方法,整個shebang將在每次從頭開始被解釋新流程開始了嗎?這是我應該擔心的事嗎? – stenci

+0

@stenci腳本中的__main__'模塊將被重新導入到孩子中,並且您調用方法的整個狀態將被醃製併發送給孩子。如果你在'if __name__ ==「__main __」:'guard中創建了其他類(你應該這樣做),而且你的小類沒有任何其他類的引用,它們將不會被醃製併發送給或重新輸入兒童。要記住的最重要的事情是用'if __name__ ==「__main __」:'來保護你不想在孩子中重新運行的東西。 – dano

相關問題