2010-10-03 68 views
1

我想延長在這裏描述的Queue.PriorityQueue:http://docs.python.org/library/queue.html#Queue.PriorityQueue擴展Python Queue.PriorityQueue(工人優先,工作的封裝類型)

隊列將舉行工作包具有優先級。工人將獲得工作包並處理它們。我想做以下補充:

  1. 工人也有一個優先。當多名員工閒置時,具有最高優先級的員工應該處理一個即將到來的工作包。

  2. 並非每個工人都可以處理每個工作包,因此需要一種機制來檢查工作包類型和工作者能力是否匹配。

我在尋找提示,如何最好的實現(從頭開始,擴展PrioriyQueue或Queue,...)。

編輯

這是我第一次(未經測試)試試。基本思想是所有等待的線程都會被通知。然後他們都試圖通過_choose_worker(self, worker)獲得工作項目。 (產地它社區維基)

編輯

作品對於現在一些簡單的測試......

編輯 添加自定義BaseManager和工人名單的本地副本在_choose_worker函數中。

編輯 bug修復

import Queue 
from Queue import Empty, Full 
from time import time as _time 
import heapq 

class AdvancedQueue(Queue.PriorityQueue): 

    # Initialize the queue representation 
    def _init(self, _maxsize): 
     self.queue = [] 
     self.worker = [] 

    def put(self, item, block=True, timeout=None): 
     ''' 
     Put an item into the queue. 

     If optional args 'block' is true and 'timeout' is None (the default), 
     block if necessary until a free slot is available. If 'timeout' is 
     a positive number, it blocks at most 'timeout' seconds and raises 
     the Full exception if no free slot was available within that time. 
     Otherwise ('block' is false), put an item on the queue if a free slot 
     is immediately available, else raise the Full exception ('timeout' 
     is ignored in that case). 
     ''' 
     self.not_full.acquire() 
     try: 
      if self.maxsize > 0: 
       if not block: 
        if self._qsize() == self.maxsize: 
         raise Full 
       elif timeout is None: 
        while self._qsize() == self.maxsize: 
         self.not_full.wait() 
       elif timeout < 0: 
        raise ValueError("'timeout' must be a positive number") 
       else: 
        endtime = _time() + timeout 
        while self._qsize() == self.maxsize: 
         remaining = endtime - _time() 
         if remaining <= 0.0: 
          raise Full 
         self.not_full.wait(remaining) 
      self._put(item) 
      self.unfinished_tasks += 1 
      self.not_empty.notifyAll() # only change 
     finally: 
      self.not_full.release() 

    def get(self, worker, block=True, timeout=None): 
     self.not_empty.acquire() 
     try: 
      self._put_worker(worker) 

      if not block: 
       if not self._qsize(): 
        raise Empty 
       else: 
        return self._choose_worker(worker) 
      elif timeout is None: 
       while True: 
        while not self._qsize(): 
         self.not_empty.wait() 
        try: 
         return self._choose_worker(worker) 
        except Empty: 
         self.not_empty.wait() 

      elif timeout < 0: 
       raise ValueError("'timeout' must be a positive number") 
      else: 
       endtime = _time() + timeout 
       def wait(endtime): 
        remaining = endtime - _time() 
        if remaining <= 0.0: 
         raise Empty 
        self.not_empty.wait(remaining) 

       while True: 
        while not self._qsize(): 
         wait(endtime) 

        try: 
         return self._choose_worker(worker) 
        except Empty: 
         wait(endtime) 
     finally: 
      self._remove_worker(worker) 
      self.not_empty.release() 

    # Put a new worker in the worker queue 
    def _put_worker(self, worker, heappush=heapq.heappush): 
     heappush(self.worker, worker) 

    # Remove a worker from the worker queue 
    def _remove_worker(self, worker): 
     self.worker.remove(worker) 

    # Choose a matching worker with highest priority 
    def _choose_worker(self, worker): 
     worker_copy = self.worker[:] # we need a copy so we can remove assigned worker 
     for item in self.queue: 
      for enqueued_worker in worker_copy: 
       if item[1].type in enqueued_worker[1].capabilities: 
        if enqueued_worker == worker: 
         self.queue.remove(item) 
         self.not_full.notify() 
         return item 
        else: 
         worker_copy.remove(enqueued_worker) 
         # item will be taken by enqueued_worker (which has higher priority), 
         # so enqueued_worker is busy and can be removed 
         continue 
     raise Empty 
+1

+1有趣的問題。我有一個想法,但我想首先看到其他答案。我現在只想給你一個小提示:注意有免費工作和兩名工人可以免費工作的情況,但是最高優先級的工作人員無法處理隊列中的工作。小心你不要陷入僵局。類似的情況是,你有兩個任務,一個工作人員和工作人員無法處理最高優先級的工作,再次注意死鎖。你應該單元測試這些情況(還有更多的測試用於其他更常見的場景 - 空隊列等)。 – 2010-10-03 09:06:14

+0

非常適合我開始使用python單元測試:) – tauran 2010-10-03 09:26:06

回答

0

唯一的答案是有用的,但不夠詳細,所以我現在會接受我自己的答案。查看問題中的代碼。

1

我認爲你是說明您有兩個「優先級隊列」的情況 - 一個用於工作,一個用於工作人員。幼稚的做法是把最重要的工作和最重要的工作人員,並嘗試將它們配對。但當工人無法執行工作時,這當然會失敗。

爲了解決這個問題,我建議首先考慮優先考慮的工作,然後按照優先級遞減的順序迭代所有工作人員,直到找到可以處理該工作的工作人員。如果沒有工人可以處理這項工作,那麼就採取第二優先級的工作,等等。所以有效地你有嵌套循環,如下所示:

def getNextWorkerAndJobPair(): 
    for job in sorted(jobs, key=priority, reverse=True): 
     for worker in sorted(workers, key=priority, reverse=True): 
      if worker.can_process(job): 
       return (worker, job) 

上面的示例雖然多次不必要地排序數據。爲了避免這種情況,最好以已排序的順序存儲數據。至於要使用什麼數據結構,我不確定最好是什麼。理想情況下,您需要O(log n)插入和刪除,並且能夠在O(n)時間以排序順序遍歷集合。我認爲PriorityQueue滿足這些要求中的第一個,但不是第二個。我想從blist包的排序列表可以工作,但我沒有嘗試過,我的網頁沒有針對此類提供的性能保證。

我建議首先遍歷作業,然後遍歷內部循環中的工作者的方式並不是您可以採用的唯一方法。您也可以顛倒循環的順序,以便首先選擇最高優先級的工作人員,然後嘗試爲其找到工作。或者你可以找到對某個函數f有最大值f(priority_job,priority_worker)的有效(job,worker)對(例如,只需添加優先級)。

+0

對於我的編輯,我遵循了你的方法。首先我會試着讓它工作,然後考慮調整它。我也想從'PriorityQueue'中重用盡可能多的代碼。 – tauran 2010-10-04 11:49:17