我試圖在管理進程下啓動數據隊列服務器(以便稍後可以轉換爲服務),並且數據隊列服務器功能在主過程,它在使用multiprocessing.Process創建的過程中不起作用。多處理器下的Python多處理RemoteManager.Process
dataQueueServer和dataQueueClient代碼基於多處理模塊文檔here中的代碼。
當自己運行時,dataQueueServer運行良好。但是,當使用multiprocessing.Process
的start()
在的mpquueue中運行時,它不起作用(與客戶端一起測試時)。我正在使用dataQueueClient而不進行測試來測試這兩種情況。
的代碼確實達到在兩種情況下serve_forever
,所以我覺得服務器正常工作,但事情是從mpqueue情況傳送回客戶端阻止它。
我已經放置了在線程下運行serve_forever()
部件的循環,以便它可以被停止。
下面是代碼:
mpqueue#這是 「經理」 的過程試圖產卵的服務器在一個子進程
import time
import multiprocessing
import threading
import dataQueueServer
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueServer(multiprocessing.Process):
def __init__(self, name = '', printer = None):
multiprocessing.Process.__init__(self)
self.name = name
self.printer = printer
self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)
def run(self):
self.printer.tsprint(self.ml)
self.ml.start()
def stop(self):
self.ml.stop()
if __name__ == '__main__':
printer = Printer()
qs = QueueServer(name = 'QueueServer', printer = printer)
printer.tsprint(qs)
printer.tsprint('starting')
qs.start()
printer.tsprint('started.')
printer.tsprint('Press Ctrl-C to quit')
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
printer.tsprint('\nTrying to exit cleanly...')
qs.stop()
printer.tsprint('stopped')
dataQueueServer
import time
import threading
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
## Define some helper functions for use by the main process loop
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueManager(BaseManager):
pass
class MainLoop(threading.Thread):
"""A thread based loop manager, allowing termination signals to be sent
to the thread"""
def __init__(self, name = '', printer = None):
threading.Thread.__init__(self)
self._stopEvent = threading.Event()
self.daemon = True
self.name = name
if printer is None:
self.printer = Printer()
else:
self.printer = printer
## create the queue
self.queue = Queue()
## Add a function to the handler to return the queue to clients
self.QM = QueueManager
self.QM.register('get_queue', callable=lambda:self.queue)
self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
self.queue_server = self.queue_manager.get_server()
def __del__(self):
self.printer.tsprint('closing...')
def run(self):
self.printer.tsprint('{}: started serving'.format(self.name))
self.queue_server.serve_forever()
def stop(self):
self.printer.tsprint ('{}: stopping'.format(self.name))
self._stopEvent.set()
def stopped(self):
return self._stopEvent.isSet()
def start():
printer = Printer()
ml = MainLoop(name = 'ml', printer = printer)
ml.start()
return ml
def stop(ml):
ml.stop()
if __name__ == '__main__':
ml = start()
raw_input("\nhit return to stop")
stop(ml)
和一個客戶端:
dataQueueClient
import datetime
from multiprocessing.managers import BaseManager
n = 0
N = 10**n
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
def now():
return datetime.datetime.now()
def gen(n, func, *args, **kwargs):
k = 0
while k < n:
yield func(*args, **kwargs)
k += 1
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()
def load(msg, q):
return q.put(msg)
def get(q):
return q.get()
lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
try:
lgen.next()
except StopIteration:
break
t1 = now()
print 'loaded %d items in ' % N, t1-t0
t0 = now()
while queue.qsize() > 0:
queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0
當我在服務器中將主機修改爲'192.168.aa.bb'時出現問題,因爲客戶端在'192.168 .aa.bb」。 它顯示錯誤: '[錯誤99]無法分配請求address' 'QM0 = CreateQueueServer(HOST,PORT 0,AUTHKEY,NAME0,描述)' 我不知道爲什麼,我只想寫服務器的東西客戶端 – 2017-08-28 11:07:51