2

我最近發現了用於在python中構建異步應用程序的Circuits框架。我正在構建一個事件驅動的應用程序,這個框架似乎很適合我的需求。該框架基於收到事件時作出反應的組件的想法。大!!我遵循教程,我創建了一些簡單的應用程序,並且每件事都似乎正常工作,直到我嘗試使用組件執行一些繁重的計算任務。我知道,這個框架支持工人,但我不想使用工人。我想要做的是在線程中運行每個組件,以便可以並行執行多個組件。該框架似乎通過start方法支持這種模式。如何在python電路中使用線程

從Component.py源代碼:

def start(self, process=False, link=None): 
    """ 
    Start a new thread or process that invokes this manager's 
    ``run()`` method. The invocation of this method returns 
    immediately after the task or process has been started. 
    """ 

    if process: 
     # Parent<->Child Bridge 
     if link is not None: 
      from circuits.net.sockets import Pipe 
      from circuits.core.bridge import Bridge 

      channels = (uuid(),) * 2 
      parent, child = Pipe(*channels) 
      bridge = Bridge(parent, channel=channels[0]).register(link) 

      args = (child,) 
     else: 
      args =() 
      bridge = None 

     self.__process = Process(
      target=self.run, args=args, name=self.name 
     ) 
     self.__process.daemon = True 
     self.__process.start() 

     return self.__process, bridge 
    else: 
     self.__thread = Thread(target=self.run, name=self.name) 
     self.__thread.daemon = True 
     self.__thread.start() 

     return self.__thread, None 

def join(self): 
    if getattr(self, "_thread", None) is not None: 
     return self.__thread.join() 

    if getattr(self, "_process", None) is not None: 
     return self.__process.join() 

於是,我就實現使用startjoin方法之前表現出經典的生產者/消費者應用。我希望生產者和消費者在他們自己的線程中運行,並且主線程等待直到他們完成。

import time 
from threading import current_thread 
from circuits import Component, Event, Debugger 


class go(Event): 
    """ go """ 

class produced(Event): 
    """ produced """ 

class Consumer(Component): 

    def started(self, *args): 
     print(current_thread()) 
     print(current_thread().ident) 
     print("Comuser started") 

    def produced(self, *args): 
     print("I am consuming...") 

class Producer(Component): 

    def started(self, *args): 
     print("Producer started") 
     print(current_thread().ident) 

    def go(self, *args): 
     print("gooooooooooo") 
     while True: 
      self.fire(produced()) 
      print("Produced element, going to sleep for 1 sec") 
      time.sleep(1) 

c = Consumer() 
c.start() 
p = Producer() 
p.start() 

p.fire(go()) 

c.join() 
p.join() 

不幸的是,上面的代碼不能正常工作。應用程序只要執行主代碼就立即退出。我的代碼有什麼問題?如果你知道任何一個以類似的方式使用這個庫的例子,你能給我提供一個鏈接嗎?

謝謝。

http://pythonhosted.org/circuits/


編輯

詹姆斯的回答後,我嘗試了一些更多的方式來運行的組件,但我還是不能讓他們並行運行。

代碼:

c = Consumer() 
c.start() 
p = Producer() 
p.run() 
p.fire(go()) 

輸出:

<Thread(Consumer, started daemon 4334432256)> 
4334432256 
Comuser started 
Producer started 
140735301485312 

它看起來像應用程序被卡住。然後,我嘗試使用一個主要應用程序組件,它可以啓動其他組件。

代碼:

class App(Component): 

    def started(self, *args): 
     print("App started") 
     p.fire(go()) 

(App() + Debugger()).run() 

輸出:

Comuser started 
Producer started 
4461318144 
<registered[*] (<Debugger/* 75445:MainThread (queued=0) [S]>, <App/* 75445:MainThread (queued=2) [R]>)> 
<started[*] (<App/* 75445:MainThread (queued=1) [R]>)> 
App started 
gooooooooooo 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
Produced element, going to sleep for 1 sec 
^C<signal[*] (2, <frame object at 0x7fe218725fa8>)> 
<stopped[*] (<App/* 75445:MainThread (queued=0) [S]>)> 

它看起來像只生產運行...我想在輸出中看到的是一樣的東西:

Produced... 
Consumed... 
Produced... 
Consumed... 

回答

2

電路的作者在這裏。

不幸的是,這裏沒有的是調用.run()的「主」組件。這就是說你已經在「線程」模式下啓動了兩個組件,但現在沒有「主」運行組件。

如果您在Producer或Consume上調用.run(),並在「線程」模式下啓動另一個;你應該獲得更多預期結果。


更新:很抱歉的延遲反應,但我相信你是這種模式(* 1根據所hello_multi_bridge.py例子)後:

from os import getpid 


from circuits import ipc, Component, Event, Timer 


class go(Event): 
    """go""" 


class produced(Event): 
    """produced""" 


class Consumer(Component): 

    def produced(self, *args): 
     print("Consumed {} from {}".format(repr(args), getpid())) 


class Producer(Component): 

    def init(self): 
     self.consumers = [ 
      Consumer().start(process=True, link=self), 
     ] 

    def ready(self, *args): 
     Timer(1, go(), persist=True).register(self) 

    def go(self, *args): 
     for process, bridge in self.consumers: 
      self.fire(ipc(produced(), bridge.channel)) 


class App(Component): 

    def init(self): 
     Producer().register(self) 


App().run() 

這將產生輸出,如:

Consumed() from 68646 
Consumed() from 68646 
... 
+0

我嘗試了你所說的,但它似乎不起作用。也許我做錯了什麼。我用一些例子更新了我的問題。 –

+0

如果這可以簡化應用程序,我還可以使用進程而不是線程。 –

+0

給我幾個小時,我會構建一個可以工作的人爲的例子:)基本上,你的「線程」組件圖並沒有連接到另一個「主」圖。認爲事件總線。看到'hello_bridge * .py'和'node /'[例子](https://github.com/circuits/circuits/tree/master/examples)爲更好的方式來做到這一點:) –