2009-05-27 57 views
11

我正在編寫一個生產者和多個消費者的服務器程序, 什麼使我困惑的只是生產者放入隊列的第一個任務獲取 消耗,之後排隊的任務不再消耗,他們永遠在隊列中保持 。python多重處理的生產者/消費者問題

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

生產者是把一個任務隊列中的一次接收 來自用戶的請求的HTTP服務器。看起來消費者進程仍然在隊列中有新任務時被阻塞,這很奇怪。

P.S.另外兩個問題與上述不相關,我不確定是否 最好把HTTP服務器放在除主進程外的其他進程中,如果是的話,我怎麼能讓主進程在所有子進程結束之前保持運行。第二個問題,優雅地阻止 HTTP服務器的最佳方式是什麼?

編輯:添加生產者代碼,它只是一個簡單的Python WSGI服務器:

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

回答

7

我想一定是壞了Web服務器的一部分,因爲這完美的作品:

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 

輸出示例:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

真棒而如果你能提供一個製片人+多工人的例子。這將是很好的。 – 2017-06-15 08:17:04

4

「第二個問題,優雅地停止HTTP服務器的最佳方法是什麼?」

這很難。

您有進程間通信兩種選擇:

  • 出帶外控制。服務器有另一種通信機制。另一個套接字,Unix信號或其他。其他的東西可能是服務器本地目錄中的「立即停止」文件。似乎很奇怪,但它確實很好,比引入一個選擇循環來偵聽多個套接字或信號處理程序以捕獲Unis信號更簡單。

    「立即停止」文件很容易實現。 evwsgi.run()循環僅在每次請求後檢查此文件。爲了使服務器停止,你創建文件,執行一個/control請求(這會得到一個500錯誤或者什麼,這並不重要),服務器應該停下來。請記住刪除立即停止文件,否則您的服務器將不會重新啓動。

  • 帶內控件。服務器有另一個URL(/stop),它會阻止它。從表面上看,這看起來像是一場安全噩夢,但它完全取決於該服務器在何處以及如何使用。由於它似乎是一個簡單的內部請求隊列封裝,所以這個額外的URL運行良好。

    爲了完成這項工作,您需要編寫自己的evwsgi.run()版本,可以通過設置某個變量來終止循環。

編輯

你可能不希望終止您的服務器,因爲你不知道國家的它的工作線程。您需要向服務器發出信號,然後您只需等待正常完成。

如果你想強行殺死服務器,那麼os.kill()(或multiprocessing.terminate)將工作。當然,除了你不知道子線程在做什麼之外。

+0

如何將服務器放在其自己的進程中,並使用multiprocessing.Process.terminate方法來終止進程?這似乎更容易。 – btw0 2009-05-27 14:49:44