2012-01-01 39 views
5

我試圖用Python編寫一個看似簡單的經典生產者 - 消費者成語 的實現。對於多個較慢的消費者,有一個相對較快的生產者。原則上,使用Queue模塊和 庫文檔的示例僅生成幾行代碼很容易。Python生產者/消費者異常處理

但是,我也希望代碼在發生異常情況下正常工作。兼顧生產者 和所有的消費者應該停止的情況下,下列任何事情發生:

  • 生產者失敗,異常
  • 任何消費者未與異常
  • 用戶停止程序(造成一個KeyboardInterrupt)

之後,整個過程應該失敗提高初始例外通知 呼叫者什麼地方出了錯。

主要的挑戰似乎是在一個阻塞連接()中乾淨地終止消費者線程而不終止 。它似乎很受歡迎設置Thread.deamon = True,但對我的 瞭解這導致資源泄漏的情況下,生產者失敗,例外。

我成功地編寫了一個滿足我需求的實現(見下文)。然而, 我發現代碼比預期複雜得多。

有沒有一種更精簡的方法來處理這些情況?

這裏有幾個例子電話,並從我目前 實現最終得到的日誌消息:

生產和消費的10項:

$ python procon.py 
INFO:root:processed all items 

產生任何項目:

$ python procon.py --items 0 
INFO:root:processed all items 

爲10位消費者生產5件商品,因此僅使用部分可用消費者:

$ python procon.py --items 5 --consumers 10 
INFO:root:processed all items 

中斷按Control-C:

$ python procon.py 
^CWARNING:root:interrupted by user 

不能產生項目3:

$ python procon.py --producer-fails-at 3 
ERROR:root:cannot produce item 3 

故障要消耗項目3:

$ python procon.py --consumer-fails-at 3 
ERROR:root:cannot consume item 3 

故障要消耗最後一項:

$ python procon.py --items 10 --consumer-fails-at 9 
ERROR:root:cannot consume item 9 

這裏是可能過於複雜的源代碼:

""" 
Consumer/producer to test exception handling in threads. Both the producer 
and the consumer can be made to fail deliberately when processing a certain 
item using command line options. 
""" 
import logging 
import optparse 
import Queue 
import threading 
import time 

_PRODUCTION_DELAY = 0.1 
_CONSUMPTION_DELAY = 0.3 

# Delay for ugly hacks and polling loops. 
_HACK_DELAY = 0.05 

class _Consumer(threading.Thread): 
    """ 
    Thread to consume items from an item queue filled by a producer, which can 
    be told to terminate in two ways: 

    1. using `finish()`, which keeps processing the remaining items on the 
     queue until it is empty 
    2. using `cancel()`, which finishes consuming the current item and then 
     terminates 
    """ 
    def __init__(self, name, itemQueue, failedConsumers): 
     super(_Consumer, self).__init__(name=name) 
     self._log = logging.getLogger(name) 
     self._itemQueue = itemQueue 
     self._failedConsumers = failedConsumers 
     self.error = None 
     self.itemToFailAt = None 
     self._log.info(u"waiting for items to consume") 
     self._isFinishing = False 
     self._isCanceled = False 

    def finish(self): 
     self._isFinishing = True 

    def cancel(self): 
     self._isCanceled = True 

    def consume(self, item): 
     self._log.info(u"consume item %d", item) 
     if item == self.itemToFailAt: 
      raise ValueError("cannot consume item %d" % item) 
     time.sleep(_CONSUMPTION_DELAY) 

    def run(self): 
     try: 
      while not (self._isFinishing and self._itemQueue.empty()) \ 
        and not self._isCanceled: 
       # HACK: Use a timeout when getting the item from the queue 
       # because between `empty()` and `get()` another consumer might 
       # have removed it. 
       try: 
        item = self._itemQueue.get(timeout=_HACK_DELAY) 
        self.consume(item) 
       except Queue.Empty: 
        pass 
      if self._isCanceled: 
       self._log.info(u"canceled") 
      if self._isFinishing: 
       self._log.info(u"finished") 
     except Exception, error: 
      self._log.error(u"cannot continue to consume: %s", error) 
      self.error = error 
      self._failedConsumers.put(self) 


class Worker(object): 
    """ 
    Controller for interaction between producer and consumers. 
    """ 
    def __init__(self, itemsToProduceCount, itemProducerFailsAt, 
      itemConsumerFailsAt, consumerCount): 
     self._itemsToProduceCount = itemsToProduceCount 
     self._itemProducerFailsAt = itemProducerFailsAt 
     self._itemConsumerFailsAt = itemConsumerFailsAt 
     self._consumerCount = consumerCount 
     self._itemQueue = Queue.Queue() 
     self._failedConsumers = Queue.Queue() 
     self._log = logging.getLogger("producer") 
     self._consumers = [] 

    def _possiblyRaiseConsumerError(self): 
      if not self._failedConsumers.empty(): 
       failedConsumer = self._failedConsumers.get() 
       self._log.info(u"handling failed %s", failedConsumer.name) 
       raise failedConsumer.error 

    def _cancelAllConsumers(self): 
     self._log.info(u"canceling all consumers") 
     for consumerToCancel in self._consumers: 
      consumerToCancel.cancel() 
     self._log.info(u"waiting for consumers to be canceled") 
     for possiblyCanceledConsumer in self._consumers: 
      # In this case, we ignore possible consumer errors because there 
      # already is an error to report. 
      possiblyCanceledConsumer.join(_HACK_DELAY) 
      if possiblyCanceledConsumer.isAlive(): 
       self._consumers.append(possiblyCanceledConsumer) 

    def work(self): 
     """ 
     Launch consumer thread and produce items. In case any consumer or the 
     producer raise an exception, fail by raising this exception 
     """ 
     self.consumers = [] 
     for consumerId in range(self._consumerCount): 
      consumerToStart = _Consumer(u"consumer %d" % consumerId, 
       self._itemQueue, self._failedConsumers) 
      self._consumers.append(consumerToStart) 
      consumerToStart.start() 
      if self._itemConsumerFailsAt is not None: 
       consumerToStart.itemToFailAt = self._itemConsumerFailsAt 

     self._log = logging.getLogger("producer ") 
     self._log.info(u"producing %d items", self._itemsToProduceCount) 

     for itemNumber in range(self._itemsToProduceCount): 
      self._possiblyRaiseConsumerError() 
      self._log.info(u"produce item %d", itemNumber) 
      if itemNumber == self._itemProducerFailsAt: 
       raise ValueError("ucannot produce item %d" % itemNumber) 
      # Do the actual work. 
      time.sleep(_PRODUCTION_DELAY) 
      self._itemQueue.put(itemNumber) 

     self._log.info(u"telling consumers to finish the remaining items") 
     for consumerToFinish in self._consumers: 
      consumerToFinish.finish() 
     self._log.info(u"waiting for consumers to finish") 
     for possiblyFinishedConsumer in self._consumers: 
      self._possiblyRaiseConsumerError() 
      possiblyFinishedConsumer.join(_HACK_DELAY) 
      if possiblyFinishedConsumer.isAlive(): 
       self._consumers.append(possiblyFinishedConsumer) 


if __name__ == "__main__": 
    logging.basicConfig(level=logging.INFO) 
    parser = optparse.OptionParser() 
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which consumer fails (default: %default)") 
    parser.add_option("-i", "--items", metavar="NUMBER", type="long", 
     help="number of items to produce (default: %default)", default=10) 
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long", 
     help="number of consumers (default: %default)", default=2) 
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which producer fails (default: %default)") 
    options, others = parser.parse_args() 
    worker = Worker(options.items, options.producer_fails_at, 
     options.consumer_fails_at, options.consumers) 
    try: 
     worker.work() 
     logging.info(u"processed all items") 
    except KeyboardInterrupt: 
     logging.warning(u"interrupted by user") 
     worker._cancelAllConsumers() 
    except Exception, error: 
     logging.error(u"%s", error) 
     worker._cancelAllConsumers() 
+0

也許你正在尋找沒有的東西,但有一個名爲芹菜很大的Python庫,你可以改用寫的你自己的排隊實施。 – 2012-01-01 11:30:20

+0

感謝您的指針。使用Web服務和數據庫,Celery對於複雜任務看起來很有趣。對於我的特定任務,製作人從文件中讀取行,並進行一些基本的結構分析並將數據傳遞給消費者 - 所以主要是I/O密集型工作。消費者在進行CPU密集型工作時處理數據。由於所有這些都發生在同一臺機器的內存中,因此Python的標準隊列似乎很適合。 – roskakori 2012-01-01 11:54:56

回答

0

由於迄今爲止的答案提供了很好的提示,但缺少工作代碼,我從我的問題中提取了代碼並將其包裝在庫中,該庫可從http://pypi.python.org/pypi/proconex/獲取。您可以在https://github.com/roskakori/proconex找到源代碼。雖然界面感覺合理,但實現仍然使用輪詢,因此歡迎提供貢獻。

在主線程中重新生成生產者或消費者線程中的任何異常。只要確保使用with語句或finally:worker.close()確保所有線程都正確關閉。

下面是兩個消費者製片整數一個簡單的例子:

import logging 
import proconex 

class IntegerProducer(proconex.Producer): 
    def items(self): 
     for item in xrange(10): 
      logging.info('produce %d', item) 
      yield item 

class IntegerConsumer(proconex.Consumer): 
    def consume(self, item): 
     logging.info('consume %d with %s', item, self.name) 

if __name__ == '__main__': 
    logging.basicConfig(level=logging.INFO) 
    producer = IntegerProducer() 
    consumer1 = IntegerConsumer('consumer1') 
    consumer2 = IntegerConsumer('consumer2') 

    with proconex.Worker(producer, [consumer1, consumer2]) as worker: 
     worker.work() 
2

您需要與清空內部隊列,設置一個取消標誌,然後喚醒大家一個cancel方法的隊列。工作人員將從join()中喚醒,檢查隊列中已取消的標誌並採取適當的行動。消費者將從get()中喚醒,並檢查隊列中取消的標誌並打印錯誤。然後,您的客戶只需在發生異常時調用cancel()方法。

不幸的是,Python隊列沒有取消方法。幾個選擇跳到腦海:

  • 推出自己的隊列(可能會非常棘手得到它的權利)
  • 延長蟒蛇隊列,並添加取消方法(將你的代碼的內部實現的Python隊列類)
  • 代理隊列類和超負荷加入/獲取你的忙等待邏輯(仍然是一個繁忙的等待黑客,但限制在一個點,並清理生產者/消費者代碼)
  • 找到另一個隊列實現/圖書館在那裏
+0

是的,將取消邏輯移至隊列當然會清理工作代碼。考慮到我的要求,隊列還需要能夠記住可能的異常信息,因爲我希望消費者向工作人員報告錯誤,而不僅僅是打印它。但那肯定是可以做到的。有沒有人知道這種隊列的現有實現? – roskakori 2012-01-01 18:58:16