2012-07-25 50 views
2

我想要做這樣的事情(1個隊列,並在多個用戶):多個消費者,是否可以克隆隊列(gevent)?

import gevent 
from gevent import queue 

q=queue.Queue() 
q.put(1) 
q.put(2) 
q.put(3) 
q.put(StopIteration) 

def consumer(qq): 
    for i in qq: 
     print i 

jobs=[gevent.spawn(consumer,i) for i in [q,q]] 

gevent.joinall(jobs) 

但是這是不可能的......隊列由JOB1消耗...所以作業2將永遠阻塞。 它給了我例外gevent.hub.LoopExit: This operation would block forever

我會每個消費者將能夠從開始消耗完整的隊列。 (應該顯示1,2,3,1,2,3或1,1,2,2,3,3 ...從不知道)

一個想法應該是在產卵前克隆隊列,但這是不可能的使用複製(淺/深)模塊;-(

是否有另一種方式來做到這一點?

[編輯] 你覺得什麼?

import gevent 
from gevent import queue 

class MasterQueueClonable(queue.Queue): 
    def __init__(self,*a,**k): 
     queue.Queue.__init__(self,*a,**k) 

     self.__cloned = [] 
     self.__old=[] 

    #override 
    def get(self,*a,**k): 
     e=queue.Queue.get(self,*a,**k) 
     for i in self.__cloned: i.put(e) # serve to current clones 
     self.__old.append(e)    # save old element 
     return e 

    def clone(self): 
     q=queue.Queue() 
     for i in self.__old: q.put(i) # feed a queue with elements which are out 
     self.__cloned.append(q)   # stock the queue, to be able to put newer elements too 
     return q 

q=MasterQueueClonable() 
q.put(1) 
q.put(2) 
q.put(3) 
q.put(StopIteration) 

def consumer(qq): 
    for i in qq: 
     print id(qq),i 

jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]] 
gevent.joinall(jobs) 

它是基於這樣的理念RyanYe。有一個沒有調度員的「主隊列」。 我的主隊列覆蓋GET方法,並可以派發到按需克隆。 此外,可以在開始masterqueue之後創建「克隆」(使用__old技巧)。

回答

2

我建議你創建一個greenlet將工作分發給消費者。示例代碼:

import gevent 
from gevent import queue 

master_queue=queue.Queue() 
master_queue.put(1) 
master_queue.put(2) 
master_queue.put(3) 
master_queue.put(StopIteration) 

total_consumers = 10 
consumer_queues = [queue.Queue() for i in xrange(total_consumers)] 

def dispatcher(master_queue, consumer_queues): 
    for i in master_queue: 
     [j.put(i) for j in consumer_queues] 
    [j.put(StopIteration) for j in consumer_queues] 

def consumer(qq): 
    for i in qq: 
     print i 

jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues] 
gevent.joinall(jobs) 

更新:修復了消費者隊列丟失的StopIteration。謝謝你指出。

+0

當然......麻煩的是,如果有在master_queue中傳輸的「50Mb」...... consumer_queues中將會出現「500Mb」(50 * 10)。 如果需要,我想按需創建消費者隊列。 – manatlan 2012-07-25 15:30:52

+1

我想提兩點1)我們談論的是10倍的引用而不是10倍的實際數據字節。如果每個項目都是一個複雜的對象,這是一個巨大的差異。 2)我們正在討論'10倍內存訪問'而不是'10倍內存空間'。在大多數情況下,這不會是瓶頸。實際上,由於每個消費者隊列需要訪問主隊列中的所有項目。因此,'10x內存訪問'是不可避免的。 – 2012-07-25 15:45:54

+0

你是對的......但在我的情況下,主隊列將包含mp3流的chunck。我需要「克隆」,因爲很多「mp3 http clients」請求很多次流(獲取信息,大小,標籤,格式...)... – manatlan 2012-07-26 07:58:49

0

在由Ryan燁一行答案是錯過了在調度()函數的末尾: [j.put(StopIteration異常),用於consumer_queues J] 沒有它,我們仍然可以得到「gevent.hub.LoopExit:這個操作會永遠阻塞',因爲'我在master_queue'循環不會將StopIteration異常複製到consumer_queues中。

(對不起,我不能發表評論還沒有,所以我把它寫成一個separete答案。)

+0

謝謝,修復了我的帖子中的代碼。 – 2012-07-26 13:08:25

1

我已經addedcopy()方法隊列類:

>>> import gevent.queue 
>>> q = gevent.queue.Queue() 
>>> q.put(5) 
>>> q.copy().get() 
5 
>>> q 
<Queue at 0x1062760d0 queue=deque([5])> 

讓我知道,如果它幫助。

+0

謝謝,我相信它錯過了這個功能。這是一個很好的補充!但在我的情況下(我現在無法測試;-(),如果在「.copy」之後的'q'中添加另一個元素會發生什麼......此元素是否也添加到複製版本? – manatlan 2012-07-26 11:29:22

+0

no,a副本當然是獨立的。 – 2012-07-26 13:09:50

相關問題