我想要做這樣的事情(1個隊列,並在多個用戶):多個消費者,是否可以克隆隊列(gevent)?
import gevent
from gevent import queue
q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print i
jobs=[gevent.spawn(consumer,i) for i in [q,q]]
gevent.joinall(jobs)
但是這是不可能的......隊列由JOB1消耗...所以作業2將永遠阻塞。 它給了我例外gevent.hub.LoopExit: This operation would block forever
。
我會每個消費者將能夠從開始消耗完整的隊列。 (應該顯示1,2,3,1,2,3或1,1,2,2,3,3 ...從不知道)
一個想法應該是在產卵前克隆隊列,但這是不可能的使用複製(淺/深)模塊;-(
是否有另一種方式來做到這一點?
[編輯] 你覺得什麼?
import gevent
from gevent import queue
class MasterQueueClonable(queue.Queue):
def __init__(self,*a,**k):
queue.Queue.__init__(self,*a,**k)
self.__cloned = []
self.__old=[]
#override
def get(self,*a,**k):
e=queue.Queue.get(self,*a,**k)
for i in self.__cloned: i.put(e) # serve to current clones
self.__old.append(e) # save old element
return e
def clone(self):
q=queue.Queue()
for i in self.__old: q.put(i) # feed a queue with elements which are out
self.__cloned.append(q) # stock the queue, to be able to put newer elements too
return q
q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print id(qq),i
jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)
它是基於這樣的理念RyanYe。有一個沒有調度員的「主隊列」。 我的主隊列覆蓋GET方法,並可以派發到按需克隆。 此外,可以在開始masterqueue之後創建「克隆」(使用__old技巧)。
當然......麻煩的是,如果有在master_queue中傳輸的「50Mb」...... consumer_queues中將會出現「500Mb」(50 * 10)。 如果需要,我想按需創建消費者隊列。 – manatlan 2012-07-25 15:30:52
我想提兩點1)我們談論的是10倍的引用而不是10倍的實際數據字節。如果每個項目都是一個複雜的對象,這是一個巨大的差異。 2)我們正在討論'10倍內存訪問'而不是'10倍內存空間'。在大多數情況下,這不會是瓶頸。實際上,由於每個消費者隊列需要訪問主隊列中的所有項目。因此,'10x內存訪問'是不可避免的。 – 2012-07-25 15:45:54
你是對的......但在我的情況下,主隊列將包含mp3流的chunck。我需要「克隆」,因爲很多「mp3 http clients」請求很多次流(獲取信息,大小,標籤,格式...)... – manatlan 2012-07-26 07:58:49