2013-04-13 29 views
1

我有一些代碼監視一些其他更改的文件,我想要做的就是啓動使用不同套接字的zeromq的代碼,現在我這樣做的方式似乎會導致斷言在libzmq的某處失敗,因爲我可能重複使用相同的套接字。我如何確保何時從監視器類創建新的進程上下文不會被重用?多數民衆贊成在我看來,如果你能告訴我有一些其他的愚蠢,請告知。 這裏是一些代碼:pyzmq使用自己的套接字創建一個進程

import zmq 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
class Monitor(object): 
    def __init(self) 
     self.context = zmq.Context() 
     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.connect("tcp//127.0.0.1:5055") 
     self.stream = ZMQStream(self._socket) 
     self.stream.on_recv(self.somefunc) 

    def initialize(self,id) 
     self._id = id 

    def somefunc(self, something) 
     """work here and send back results if any """ 
     import json 
     jdecoded = json.loads(something) 
     if self_id == jdecoded['_id'] 
      """ good im the right monitor for you """ 
      work = jdecoded['message'] 
      results = algorithm (work) 
      self.socket.send(json.dumps(results)) 
     else: 
      """let some other process deal with it, not mine """ 
      pass 

class Prefect(object): 
    def __init(self, id) 
     self.context = zmq.Context() 
     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.bind("tcp//127.0.0.1:5055") 
     self.stream = ZMQStream(self._socket) 
     self.stream.on_recv(self.check_if) 
     self._id = id 
     self.monitors = [] 
    def check_if(self,message): 
     """find out from message's id whether we have 
      started a proces for it previously""" 
     import json 
     jdecoded = json.loads(message) 
     this_id = jdecoded['_id'] 
     if this_id in self.monitors: 
      pass 
     else: 
      """start new process for it should have its won socket """ 
      new = Monitor() 
      import Process 
      newp = Process(target=new.initialize,args=(this_id)) 
      newp.start() 
      self.monitors.append(this_id) ## ensure its remembered 

正在發生的事情是,我想所有的顯示器processess和一個知府進程監聽相同的端口上,所以當知府看到的是它未見過的請求啓動一個進程對於它來說,所有存在的流程可能都應該聽取,但忽略不適合他們的消息。 因爲它,如果我這樣做,我得到一些崩潰可能涉及同一個zmq套接字的併發訪問的東西(我嘗試threading.thread,仍然崩潰)我讀的地方,不同線程併發訪問一個zmq套接字不是可能。我將如何確保新進程獲得他們自己的zmq套接字?

編輯: 在我的應用程序的主處理是一個請求經由ZMQ插座進來,和一個處理(一個或多個)由那聽反作用於消息:

1. If its directed at that process judged by the _id field, do some reading on a file and reply since one of the monitors match the messages _id, if none match, then: 
2 If the messages _id files is not recognized, all monitors ignore it but the Prefect creates a process to handle that _id and all future messages to that id. 
3. I want all the messages to be seen by the monitor processes as well as the prefect process, seems that seems easiest, 
4. All the messages are very small, avarage ~4096 bytes. 
5. The monitor does some non-blocking read and for each ioloop it sends what it has found out 

更多編輯=>和知情進程現在綁定,它將接收消息並回顯它們,以便它們可以被監視器看到。這就是我所想到的,作爲架構,但它不是最終的。 。

所有消息都通過瀏覽器從遠程用戶到達,讓服務器知道客戶端需要什麼,服務器通過zmq將消息發送到後端(我沒有顯示這一點,但並不難),因此在生產中他們可能不會綁定/連接到本地主機。 我選擇了DEALER,因爲它允許在任一方向上使用asyc/unlimited消息(參見第5點),DEALER可以與DEALER綁定,並且可以從任何一方到達初始請求/回覆。另一個可以做到這一點的可能是經銷商/路由器。

回答

2

你是正確的,你不能繼續在叉邊界上使用相同的套接字(多處理使用叉)。通常,這意味着您不想創建將在子進程啓動之前用於分叉進程的套接字。 由於在你的情況下,套接字是Monitor對象上的一個屬性,所以你根本不想在主進程中創建Monitor。這將是這個樣子:

def start_monitor(this_id): 
    monitor = Monitor() 
    monitor.initialize(this_id) 
    # run the eventloop, or this will return immediately and destroy the monitor 

... inside Prefect.check_if(): 

    proc = Process(target=start_monitor, args=(this_id,)) 
    proc.start() 
    self.monitors.append(this_id) 

,而不是你的榜樣,其中子做的唯一的事情是分配一個ID,然後殺死進程,最終有沒有任何效果。

+0

對,讓我試試這個,回覆你。這個不錯的主意。 – mike

+0

不錯,而且很好。我看不到那個。 – mike

+0

+1好答案。我是新來的zeromq,但它是一個很棒的圖書館:) – incognick

相關問題