2013-07-29 79 views
0

我有一個生產者設置與N個消費者。Python進程間隊列溢出

生產者偵聽接收大量TCP消息的套接字(每分鐘10,000個),讀取這些數據並將其放入工作隊列中。

工人我已經建立從隊列中讀取,如下所示:

iterations = 0 
work_iterations = 0 
while True: 
    try: 
    iterations += 1 
    data = queue.get(block=False) 
    work_iterations +=1 
    do_work(data) 
    except Queue.Empty: 
    time.sleep(0.001) #avoid high CPU usage 


    if iterations == 100: 
    load = float(work_iterations/iterations) 
    print load 
    iterations = 0 
    work_iterations = 0 

這是簡化代碼,但你可以看到我想看到工人負載,但看到多少次迭代出來100名工人實際上能夠將工作從隊列中拉出來。如果負載一直是100/100,那麼我知道生產者/消費者隊列正在積壓。理論上這應該工作。

我在輸出中看到很多0.97,0.99和很少1.0。但是隊列在幾分鐘內就會填滿(它的大小限制爲10,000),我必須開始在Producer端丟棄數據。任何人都可以照亮爲什麼會發生這種情況?如果工作進程平均獲得97/100次迭代,那意味着隊列應該接近於空嗎?

回答

-1

如果刪除block = Flase和time.sleep()會怎麼樣? 你將無法計算工人。

0

當您調用queue.get(block = False)時,即使隊列不是實際爲空,也可能會引發Queue.Empty。如果當前進程無法獲取訪問隊列的鎖,則無論隊列中實際有多少項,都會引發Queue.Empty。

就讓我們來看看在多/ queues.py的Queue.get()代碼:

126 if not self._rlock.acquire(block, timeout): 
127  raise Empty 

注意到有隊列實際上是如何滿是提高異常之前沒有檢查。由於你有太多的信息被排隊,我懷疑有幾次Queue.Empty被提出,它實際上是由生產者在排隊時持有鎖造成的,導致你的工人嘗試訪問隊列失敗。

你可以用一個小的改變了代碼檢查:

except Queue.Empty: 
    print queue.qsize() # returns the approximate number of elements in the queue 

由於the documentation說,這個數字並不完全可靠。但是,由於您正在處理隊列中的大量項目,因此它應該足夠接近以告訴您隊列是否接近0或10,000。