我最近發現了用於在python中構建異步應用程序的Circuits框架。我正在構建一個事件驅動的應用程序,這個框架似乎很適合我的需求。該框架基於收到事件時作出反應的組件的想法。大!!我遵循教程,我創建了一些簡單的應用程序,並且每件事都似乎正常工作,直到我嘗試使用組件執行一些繁重的計算任務。我知道,這個框架支持工人,但我不想使用工人。我想要做的是在線程中運行每個組件,以便可以並行執行多個組件。該框架似乎通過start
方法支持這種模式。如何在python電路中使用線程
從Component.py源代碼:
def start(self, process=False, link=None):
"""
Start a new thread or process that invokes this manager's
``run()`` method. The invocation of this method returns
immediately after the task or process has been started.
"""
if process:
# Parent<->Child Bridge
if link is not None:
from circuits.net.sockets import Pipe
from circuits.core.bridge import Bridge
channels = (uuid(),) * 2
parent, child = Pipe(*channels)
bridge = Bridge(parent, channel=channels[0]).register(link)
args = (child,)
else:
args =()
bridge = None
self.__process = Process(
target=self.run, args=args, name=self.name
)
self.__process.daemon = True
self.__process.start()
return self.__process, bridge
else:
self.__thread = Thread(target=self.run, name=self.name)
self.__thread.daemon = True
self.__thread.start()
return self.__thread, None
def join(self):
if getattr(self, "_thread", None) is not None:
return self.__thread.join()
if getattr(self, "_process", None) is not None:
return self.__process.join()
於是,我就實現使用start
和join
方法之前表現出經典的生產者/消費者應用。我希望生產者和消費者在他們自己的線程中運行,並且主線程等待直到他們完成。
import time
from threading import current_thread
from circuits import Component, Event, Debugger
class go(Event):
""" go """
class produced(Event):
""" produced """
class Consumer(Component):
def started(self, *args):
print(current_thread())
print(current_thread().ident)
print("Comuser started")
def produced(self, *args):
print("I am consuming...")
class Producer(Component):
def started(self, *args):
print("Producer started")
print(current_thread().ident)
def go(self, *args):
print("gooooooooooo")
while True:
self.fire(produced())
print("Produced element, going to sleep for 1 sec")
time.sleep(1)
c = Consumer()
c.start()
p = Producer()
p.start()
p.fire(go())
c.join()
p.join()
不幸的是,上面的代碼不能正常工作。應用程序只要執行主代碼就立即退出。我的代碼有什麼問題?如果你知道任何一個以類似的方式使用這個庫的例子,你能給我提供一個鏈接嗎?
謝謝。
http://pythonhosted.org/circuits/
編輯
詹姆斯的回答後,我嘗試了一些更多的方式來運行的組件,但我還是不能讓他們並行運行。
代碼:
c = Consumer()
c.start()
p = Producer()
p.run()
p.fire(go())
輸出:
<Thread(Consumer, started daemon 4334432256)>
4334432256
Comuser started
Producer started
140735301485312
它看起來像應用程序被卡住。然後,我嘗試使用一個主要應用程序組件,它可以啓動其他組件。
代碼:
class App(Component):
def started(self, *args):
print("App started")
p.fire(go())
(App() + Debugger()).run()
輸出:
Comuser started
Producer started
4461318144
<registered[*] (<Debugger/* 75445:MainThread (queued=0) [S]>, <App/* 75445:MainThread (queued=2) [R]>)>
<started[*] (<App/* 75445:MainThread (queued=1) [R]>)>
App started
gooooooooooo
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
^C<signal[*] (2, <frame object at 0x7fe218725fa8>)>
<stopped[*] (<App/* 75445:MainThread (queued=0) [S]>)>
它看起來像只生產運行...我想在輸出中看到的是一樣的東西:
Produced...
Consumed...
Produced...
Consumed...
我嘗試了你所說的,但它似乎不起作用。也許我做錯了什麼。我用一些例子更新了我的問題。 –
如果這可以簡化應用程序,我還可以使用進程而不是線程。 –
給我幾個小時,我會構建一個可以工作的人爲的例子:)基本上,你的「線程」組件圖並沒有連接到另一個「主」圖。認爲事件總線。看到'hello_bridge * .py'和'node /'[例子](https://github.com/circuits/circuits/tree/master/examples)爲更好的方式來做到這一點:) –