2015-04-16 63 views
0

我想使用python gevent庫來實現一個生產者和多個消費者服務器。有我的嘗試:gevent隊列失敗,LoopExit

class EmailValidationServer(): 
    def __init__(self): 
     self.queue = Queue() 
    def worker(self): 
     while True: 
      json = self.queue.get() 
    def handler(self,socket,address): 
     fileobj = socket.makefile() 
     content = fileobj.read(max_read) 
     contents = json.loads(content) 
     for content in contents: 
      self.queue.put(content) 
    def daemon(self,addr='127.0.0.1',num_thread=5): 
     pool = Pool(1000) 
     server = StreamServer((addr, 6000),self.handler,spawn=pool) # run 
     pool = ThreadPool(num_thread) 
     for _ in range(num_thread): 
      pool.spawn(self.worker) 
     server.serve_forever() 
if __name__ == "__main__": 
    email_server = EmailValidationServer() 
    email_server.daemon() 

我用gevent.queue.Queue的隊列。它給我的錯誤信息:

LoopExit: This operation would block forever 
(<ThreadPool at 0x7f08c80eef50 0/4/5>, 
<bound method EmailValidationServer.worker of <__main__.EmailValidationServer instance at 0x7f08c8dcd998>>) failed with LoopExit 

問題:但是,當我從GEVENT的實施蟒蛇內建庫改變隊列,它的工作原理。我不知道原因,我想它的實施有所不同。我不知道爲什麼gevent不允許無限等待。有沒有人可以給出解釋?感謝提前

回答

2

我建議你可以使用gevent.queue.JoinableQueue()而不是Python的內置Queue()。您可以參閱API用法正式隊列指南(http://www.gevent.org/gevent.queue.html

def worker(): 
    while True: 
     item = q.get() 
     try: 
      do_work(item) 
     finally: 
      q.task_done() 

q = JoinableQueue() 
for i in range(num_worker_threads): 
    gevent.spawn(worker) 

for item in source(): 
    q.put(item) 

q.join() # block until all tasks are done 

如果再次遇到了例外,你最好充分了解GEVENT的原則corouinte控制流程......一旦你明白了吧,那不是什麼大問題。 :)

+1

@SuperBiasedMan thans格式化我的代碼塊:) –