2012-07-18 77 views
4

我試圖在管理進程下啓動數據隊列服務器(以便稍後可以轉換爲服務),並且數據隊列服務器功能在主過程,它在使用multiprocessing.Process創建的過程中不起作用。多處理器下的Python多處理RemoteManager.Process

dataQueueServer和dataQueueClient代碼基於多處理模塊文檔here中的代碼。

當自己運行時,dataQueueServer運行良好。但是,當使用multiprocessing.Processstart()的mpquueue中運行時,它不起作用(與客戶端一起測試時)。我正在使用dataQueueClient而不進行測試來測試這兩種情況。

的代碼確實達到在兩種情況下serve_forever,所以我覺得服務器正常工作,但事情是從mpqueue情況傳送回客戶端阻止它。

我已經放置了在線程下運行serve_forever()部件的循環,以便它可以被停止。

下面是代碼:

mpqueue#這是 「經理」 的過程試圖產卵的服務器在一個子進程

import time 
import multiprocessing 
import threading 
import dataQueueServer 

class Printer(): 
    def __init__(self): 
     self.lock = threading.Lock() 
    def tsprint(self, text): 
     with self.lock: 
      print text 

class QueueServer(multiprocessing.Process): 
    def __init__(self, name = '', printer = None): 
     multiprocessing.Process.__init__(self) 
     self.name = name 
     self.printer = printer 
     self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer) 

    def run(self): 
     self.printer.tsprint(self.ml) 
     self.ml.start() 

    def stop(self): 
     self.ml.stop() 

if __name__ == '__main__': 
    printer = Printer() 
    qs = QueueServer(name = 'QueueServer', printer = printer) 
    printer.tsprint(qs) 
    printer.tsprint('starting') 
    qs.start() 
    printer.tsprint('started.') 
    printer.tsprint('Press Ctrl-C to quit') 
    try: 
     while True: 
      time.sleep(60) 
    except KeyboardInterrupt: 
     printer.tsprint('\nTrying to exit cleanly...') 
     qs.stop() 

    printer.tsprint('stopped') 

dataQueueServer

import time 
import threading 

from multiprocessing.managers import BaseManager 
from multiprocessing import Queue 

HOST = '' 
PORT = 50010 
AUTHKEY = 'authkey' 

## Define some helper functions for use by the main process loop 
class Printer(): 
    def __init__(self): 
     self.lock = threading.Lock() 
    def tsprint(self, text): 
     with self.lock: 
      print text 



class QueueManager(BaseManager): 
    pass 


class MainLoop(threading.Thread): 
    """A thread based loop manager, allowing termination signals to be sent 
    to the thread""" 
    def __init__(self, name = '', printer = None): 
     threading.Thread.__init__(self) 
     self._stopEvent = threading.Event() 
     self.daemon = True 
     self.name = name 

     if printer is None: 
      self.printer = Printer() 
     else: 
      self.printer = printer 

     ## create the queue 
     self.queue = Queue() 
     ## Add a function to the handler to return the queue to clients 
     self.QM = QueueManager 

     self.QM.register('get_queue', callable=lambda:self.queue) 
     self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY) 
     self.queue_server = self.queue_manager.get_server() 

    def __del__(self): 
     self.printer.tsprint('closing...') 


    def run(self): 
     self.printer.tsprint('{}: started serving'.format(self.name)) 
     self.queue_server.serve_forever() 


    def stop(self): 
     self.printer.tsprint ('{}: stopping'.format(self.name)) 
     self._stopEvent.set() 

    def stopped(self): 
     return self._stopEvent.isSet() 

def start(): 
    printer = Printer() 
    ml = MainLoop(name = 'ml', printer = printer) 
    ml.start() 
    return ml 

def stop(ml): 
    ml.stop() 

if __name__ == '__main__': 
    ml = start() 
    raw_input("\nhit return to stop") 
    stop(ml) 

和一個客戶端:

dataQueueClient

import datetime 
from multiprocessing.managers import BaseManager 


n = 0 
N = 10**n 

HOST = '' 
PORT = 50010 
AUTHKEY = 'authkey' 


def now(): 
    return datetime.datetime.now() 

def gen(n, func, *args, **kwargs): 
    k = 0 
    while k < n: 
     yield func(*args, **kwargs) 
     k += 1 

class QueueManager(BaseManager): 
    pass 
QueueManager.register('get_queue') 
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY) 
m.connect() 
queue = m.get_queue() 

def load(msg, q): 
    return q.put(msg) 

def get(q): 
    return q.get() 

lgen = gen(N, load, msg = 'hello', q = queue) 
t0 = now() 
while True: 
    try: 
     lgen.next() 
    except StopIteration: 
     break 
t1 = now() 
print 'loaded %d items in ' % N, t1-t0 

t0 = now() 
while queue.qsize() > 0: 
    queue.get() 
t1 = now() 
print 'got %d items in ' % N, t1-t0 

回答

8

所以好像該解決方案是很簡單:不要使用serve_forever(),並使用manager.start()來代替。

Eli BenderskyBaseManager(和它的擴展版本SyncManager)已經將在一個新的進程的服務器(和看multiprocessing.managers代碼證實了這一點)。我遇到的問題源於示例中使用的表單,其中服務器在主進程下啓動。

我仍然不明白爲什麼當前的示例在子進程下運行時不起作用,但這不再是問題。

這裏的工作(和許多從OP簡化)代碼來管理多個隊列服務器:

服務器

from multiprocessing import Queue 
from multiprocessing.managers import SyncManager 

HOST = '' 
PORT0 = 5011 
PORT1 = 5012 
PORT2 = 5013 
AUTHKEY = 'authkey' 

name0 = 'qm0' 
name1 = 'qm1' 
name2 = 'qm2' 

description = 'Queue Server' 

def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None): 
    name = name 
    description = description 
    q = Queue() 

    class QueueManager(SyncManager): 
     pass 


    QueueManager.register('get_queue', callable = lambda: q) 
    QueueManager.register('get_name', callable = name) 
    QueueManager.register('get_description', callable = description) 
    manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY) 
    manager.start() # This actually starts the server 

    return manager 

# Start three queue servers 
qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description) 
qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description) 
qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description) 

raw_input("return to end") 

客戶

from multiprocessing.managers import SyncManager 

HOST = '' 
PORT0 = 5011 
PORT1 = 5012 
PORT2 = 5013 
AUTHKEY = 'authkey' 

def QueueServerClient(HOST, PORT, AUTHKEY): 
    class QueueManager(SyncManager): 
     pass 
    QueueManager.register('get_queue') 
    QueueManager.register('get_name') 
    QueueManager.register('get_description') 
    manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY) 
    manager.connect() # This starts the connected client 
    return manager 

# create three connected managers 
qc0 = QueueServerClient(HOST, PORT0, AUTHKEY) 
qc1 = QueueServerClient(HOST, PORT1, AUTHKEY) 
qc2 = QueueServerClient(HOST, PORT2, AUTHKEY) 
# Get the queue objects from the clients 
q0 = qc0.get_queue() 
q1 = qc1.get_queue() 
q2 = qc2.get_queue() 
# put stuff in the queues 
q0.put('some stuff') 
q1.put('other stuff') 
q2.put({1:123, 2:'abc'}) 
# check their sizes 
print 'q0 size', q0.qsize() 
print 'q1 size', q1.qsize() 
print 'q2 size', q2.qsize() 

# pull some stuff and print it 
print q0.get() 
print q1.get() 
print q2.get() 

添加額外的服務器與運行隊列服務的信息共享字典rs,以便消費者可以使用該模型輕鬆分辨哪些地方可用。但需要注意的一點是,共享字典與普通字典的語法略有不同:dictionary[0] = something不起作用。您需要使用dictionary.update([(key, value), (otherkey, othervalue)])dictionary.get(key)語法,該語法傳播到連接到此字典的所有其他客戶端。

+0

當我在服務器中將主機修改爲'192.168.aa.bb'時出現問題,因爲客戶端在'192.168 .aa.bb」。 它顯示錯誤: '[錯誤99]無法分配請求address' 'QM0 = CreateQueueServer(HOST,PORT 0,AUTHKEY,NAME0,描述)' 我不知道爲什麼,我只想寫服務器的東西客戶端 – 2017-08-28 11:07:51