2015-02-08 112 views
4

更新2015年8月:對於想要使用消息傳遞的人,我目前會推薦使用zeromq。可以用作pykka的補充或完全替代。如何發送RabbitMQ消息給Pykka演員?

我該如何聆聽RabbitMQ隊列中的消息,然後將它們轉發給Pykka中的演員?

目前,當我嘗試這樣做時,我會發現奇怪的行爲,系統停下來停下來。

這裏是我有我的演員來實現:

class EventListener(eventlet.EventletActor): 
    def __init__(self, target): 
     """ 
     :param pykka.ActorRef target: Where to send the queue messages. 
     """ 
     super(EventListener, self).__init__() 

     self.target = target 

    def on_start(self): 
     ApplicationService.listen_for_events(self.actor_ref) 

這裏是我的ApplicationService類應該檢查隊列新郵件裏面方法:

@classmethod 
def listen_for_events(cls, actor): 
    """ 
    Subscribe to messages and forward them to the given actor. 
    """  
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue='test') 
    def callback(ch, method, properties, body): 
     message = pickle.loads(body) 
     actor.tell(message) 

    channel.basic_consume(callback, queue='test', no_ack=True) 
    channel.start_consuming()    

好像start_consuming無限期阻止。我有辦法定期自行「輪詢」隊列嗎?

+0

你有什麼特別的原因,你在一個程序中同時使用'pika'和'pykka'?似乎你可能會更好地使用自己的'pykka'。 – dano 2015-02-17 00:30:31

+0

要重現此行爲,請分享更多代碼,如ApplicationServiceClass和其他相關代碼嗎? – Vinkal 2015-02-17 14:32:26

+0

@dano我需要併發進程在對隊列消息的響應中運行。 (想想某種密集的數據分析)。 – drozzy 2015-02-19 02:30:04

回答

3

你所有的代碼對我來說都是正確的。如果你想檢查每個演員使用的隊列,你可以檢查actor_inbox屬性可用於從Actor#start返回的演員參考。

我從EventletActor繼承時遇到類似的問題,以便測試我使用EventletActor和使用ThreadingActor嘗試相同的代碼。據我從源代碼可以看出,他們都使用eventlet來完成工作。 ThreadingActor非常適合我,但EventletActor不適用於ActorRef#tell,它可以與ActorRef#ask一起使用。

我從兩個文件開始,位於如下所示的同一目錄中。

my_actors.py:初始化兩個參與者,它們將通過打印以其類名開頭的消息內容來響應消息。

from pykka.eventlet import EventletActor 
import pykka 


class MyThreadingActor(pykka.ThreadingActor): 
    def __init__(self): 
     super(MyThreadingActor, self).__init__() 

    def on_receive(self, message): 
     print(
      "MyThreadingActor Received: {message}".format(
       message=message) 
     ) 


class MyEventletActor(EventletActor): 
    def __init__(self): 
     super(MyEventletActor, self).__init__() 

    def on_receive(self, message): 
     print(
      "MyEventletActor Received: {message}".format(
       message=message) 
     ) 


my_threading_actor_ref = MyThreadingActor.start() 
my_eventlet_actor_ref = MyEventletActor.start() 

my_queue.py:設置隊列中鼠兔,將消息發送到其前轉發到兩個演員設置隊列中。在每個演員被告知消息後,他們當前的演員收件箱將被檢查隊列中的任何內容。

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref 
import pika 


def on_message(channel, method_frame, header_frame, body): 
    print "Received Message", body 
    my_threading_actor_ref.tell({"msg": body}) 
    my_eventlet_actor_ref.tell({"msg": body}) 

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox 
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox 

    channel.basic_ack(delivery_tag=method_frame.delivery_tag) 


queue_name = 'test' 
connection = pika.BlockingConnection() 

channel = connection.channel() 
channel.queue_declare(queue=queue_name) 
channel.basic_consume(on_message, queue_name) 
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message') 

try: 
    channel.start_consuming() 
except KeyboardInterrupt: 
    channel.stop_consuming() 

    # It is very important to stop these actors, otherwise you may lockup 
    my_threading_actor_ref.stop() 
    my_eventlet_actor_ref.stop() 
connection.close() 

當運行my_queue.py輸出如下:

接收消息的消息

ThreadingActor收件箱<Queue.Queue instance at 0x10bf55878>

MyThreadingActor收稿:{'msg': 'A Message'}

EventletActor收件箱<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

當我打CTRL+C停止排隊,我注意到EventletActor終於收到消息並打印:收到

^C MyEventletActor:{'msg': 'A Message'}

這一切都使我相信在EventletActor中可能存在一個錯誤,我認爲你的代碼沒問題,並且存在一個我在第一次檢查時無法在代碼中找到的錯誤。

我希望這些信息對您有所幫助。

+0

有意思......我停止使用ThreadingActors,因爲我需要產生一大堆......但是,好像EventletActors不能與ThreadingActors進行交互操作,所以必須將每個actor都切換到一個線程。 Eventlet對我來說仍然是一個謎。太糟糕pykka沒有被更新。 – drozzy 2015-02-19 07:33:02