我想延長在這裏描述的Queue.PriorityQueue:http://docs.python.org/library/queue.html#Queue.PriorityQueue擴展Python Queue.PriorityQueue(工人優先,工作的封裝類型)
隊列將舉行工作包具有優先級。工人將獲得工作包並處理它們。我想做以下補充:
工人也有一個優先。當多名員工閒置時,具有最高優先級的員工應該處理一個即將到來的工作包。
並非每個工人都可以處理每個工作包,因此需要一種機制來檢查工作包類型和工作者能力是否匹配。
我在尋找提示,如何最好的實現(從頭開始,擴展PrioriyQueue或Queue,...)。
編輯
這是我第一次(未經測試)試試。基本思想是所有等待的線程都會被通知。然後他們都試圖通過_choose_worker(self, worker)
獲得工作項目。 (產地它社區維基)
編輯
作品對於現在一些簡單的測試......
編輯 添加自定義BaseManager
和工人名單的本地副本在_choose_worker
函數中。
編輯 bug修復
import Queue
from Queue import Empty, Full
from time import time as _time
import heapq
class AdvancedQueue(Queue.PriorityQueue):
# Initialize the queue representation
def _init(self, _maxsize):
self.queue = []
self.worker = []
def put(self, item, block=True, timeout=None):
'''
Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notifyAll() # only change
finally:
self.not_full.release()
def get(self, worker, block=True, timeout=None):
self.not_empty.acquire()
try:
self._put_worker(worker)
if not block:
if not self._qsize():
raise Empty
else:
return self._choose_worker(worker)
elif timeout is None:
while True:
while not self._qsize():
self.not_empty.wait()
try:
return self._choose_worker(worker)
except Empty:
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
def wait(endtime):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
while True:
while not self._qsize():
wait(endtime)
try:
return self._choose_worker(worker)
except Empty:
wait(endtime)
finally:
self._remove_worker(worker)
self.not_empty.release()
# Put a new worker in the worker queue
def _put_worker(self, worker, heappush=heapq.heappush):
heappush(self.worker, worker)
# Remove a worker from the worker queue
def _remove_worker(self, worker):
self.worker.remove(worker)
# Choose a matching worker with highest priority
def _choose_worker(self, worker):
worker_copy = self.worker[:] # we need a copy so we can remove assigned worker
for item in self.queue:
for enqueued_worker in worker_copy:
if item[1].type in enqueued_worker[1].capabilities:
if enqueued_worker == worker:
self.queue.remove(item)
self.not_full.notify()
return item
else:
worker_copy.remove(enqueued_worker)
# item will be taken by enqueued_worker (which has higher priority),
# so enqueued_worker is busy and can be removed
continue
raise Empty
+1有趣的問題。我有一個想法,但我想首先看到其他答案。我現在只想給你一個小提示:注意有免費工作和兩名工人可以免費工作的情況,但是最高優先級的工作人員無法處理隊列中的工作。小心你不要陷入僵局。類似的情況是,你有兩個任務,一個工作人員和工作人員無法處理最高優先級的工作,再次注意死鎖。你應該單元測試這些情況(還有更多的測試用於其他更常見的場景 - 空隊列等)。 – 2010-10-03 09:06:14
非常適合我開始使用python單元測試:) – tauran 2010-10-03 09:26:06