2017-10-10 35 views
0

我有一個FLASK APP並使用gunicorn(同步模式)作爲Web服務器。爲了異步推送信息,當gunicorn啓動時,我使用「gunicorn服務器鉤子」來啓動維護進程(multiprocessing.Process()),並使用multiprocessing.Queue()(它實際上是logging.handlers.QueueHandler(Queue)與python日誌記錄)發送消息。 但是我發現如果gunicorn工作者在「[CRITICAL] WORKER TIMEOUT」時重新啓動,維護過程將不會從隊列發送的隊列中獲取消息(隊列.qsize()不是0),並且根據日誌,消息成功排隊,但Queue.get(timeout)引發空異常),但可以從gunicorn主進程獲取消息。 我的日誌:multiprocessing.queue在gunicorn工作者超時後無法獲取數據

34 pid:24831 wechatlog : 2017-10-10 06:32:43,552 wechat_middle.py[line:34] DEBUG recive <LogRecord: wechat, 40, /www_upload/src/api_server.py, 543, "{'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}"> 
    35 pid:23930 wechat : 2017-10-10 06:38:56,805 api_server.py[line:543] ERROR {'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'} 
    36 pid:24831 wechatlog : 2017-10-10 06:38:56,807 wechat_middle.py[line:34] DEBUG recive <LogRecord: wechat, 40, /www_upload/src/api_server.py, 543, "{'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}"> 
    37 pid:24887 wechat : 2017-10-10 07:07:50,904 api_server.py[line:543] ERROR {'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'} 
    38 pid:24831 wechatlog : 2017-10-10 07:07:51,810 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 0 
    39 pid:24831 wechatlog : 2017-10-10 07:07:55,813 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 1 
    40 pid:24831 wechatlog : 2017-10-10 07:07:57,813 wechat_middle.py[line:25] INFO in debug mode, queue id 139972199063056, size 1 
    41 pid:24831 wechatlog : 2017-10-10 07:07:59,816 wechat_middle.py[line:31] ERROR in debug mode, queue get nothing. 
    42 pid:24831 wechatlog : 2017-10-10 07:07:59,816 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 1 
    43 pid:24831 wechatlog : 2017-10-10 07:08:00,817 maintain_task.py[line:281] ERROR queue is empty 
    44 pid:24831 wechatlog : 2017-10-10 07:08:00,818 maintain_task.py[line:283] ERROR the message block the queue: None 

2017年10月10日之間的6點38分56秒至2017年10月10日7時07分50秒時,gunicorn日誌報告中:

[2017-10-10 06:41:08 +0800] [23906] [CRITICAL] WORKER TIMEOUT (pid:24838) 

我的代碼:

maintain_task.py 
def wechat_push_thread(queue): 
    we = wechat_middler_ware(queue=queue) 
    wechat_log_logger = configs.make_logger_handler('wechatlog', filename='wechat') 
    wechat_log_logger.info(f'queue id: {id(queue)}') 
    debug_size = 0 
    while True: 
     try: 
      we.listen(2) 
     except Exception as e: 
      wechat_log_logger.exception(e) 
     # for debug 
     if queue.qsize() > 0: 
      wechat_log_logger.info(f'current qsize: {queue.qsize()}, debug_size: {debug_size}') 
      if debug_size == queue.qsize(): 
       if we.debug_flag: 
        try: 
         msg = queue.get(timeout=1) 
        except Empty: 
         msg = None 
         wechat_log_logger.error(f'queue is empty') 
        wechat_log_logger.error(f'the message block the queue: {msg}') 
       we.debug_flag = True 
      debug_size = queue.qsize() 
     else: 
      we.debug_flag = False 
      debug_size = 0 
     # endfor debug 
     if quit_event.wait(timeout=2): 
      break 
    logger.info('wechat_push_thread clean env') 

wechat_middle.py 
class wechat_middler_ware: 
    def __init__(self, queue): 
     self.q = queue 
     self.logger = configs.make_logger_handler('wechatlog', filename='wechat') 
     self.push_api = Push_Server(logger=self.logger) 
     self.debug_flag = False 

    def listen(self, timeout): 
     while True: 
      if self.debug_flag: 
       self.logger.info(f'in debug mode, queue id {id(self.q)}, size {self.q.qsize()}') 
      try: 
       msg = self.q.get(timeout=timeout) 
       self.logger.debug(f'recive {msg}') 
      except Empty: 
       if self.debug_flag: 
        self.logger.error(f'in debug mode, queue get nothing.') 
       break 
      else: 
       ... 
+0

當我減小隊列大小,就像2一樣,它會引起Queue.full異常。 Queue.get()仍然爲空。 –

回答

0

根據PY DOC:

警告:如果此方法爲u如果關聯進程使用管道或隊列,則管道或隊列可能會損壞,並且 可能會被其他進程無法使用。同樣,如果進程有 獲得鎖或信號量等,則終止它將導致其他進程死鎖。

gunicorn的主人殺死了「超時」工作人員,所以隊列變得無法使用其他進程。 現在,我使用multiprocessing.manager.queue而不是multiprocessing.queue。即使工人被主人殺死,它也可以工作。