我有一個基本的生產者/消費者腳本在gevent中運行。它開始是把東西放到一個gevent.queue.Queue
幾個生產函數,而取出來再排隊的一個消費函數:顯式開關()與gevent
from __future__ import print_function
import time
import gevent
import gevent.queue
import gevent.monkey
q = gevent.queue.Queue()
# define and spawn a consumer
def consumer():
while True:
item = q.get(block=True)
print('consumer got {}'.format(item))
consumer_greenlet = gevent.spawn(consumer)
# define and spawn a few producers
def producer(ID):
while True:
print("producer {} about to put".format(ID))
q.put('something from {}'.format(ID))
time.sleep(0.1)
# consumer_greenlet.switch()
producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()
,如果我讓GEVENT隱式處理調度它工作正常(例如,通過呼叫時間。睡眠或其他一些gevent.monkey.patch()
編輯功能),但是當我切換到消費者明確(更換time.sleep
與註釋掉switch
調用),GEVENT提出了一個AssertionError:
Traceback (most recent call last):
File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
result = self._run(*self.args, **self.kwargs)
File "switch_test.py", line 14, in consumer
item = q.get(block=True)
File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
assert result is waiter, 'Invalid switch into Queue.get: %r' % (result,)
AssertionError: Invalid switch into Queue.get:()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError
我想聘請明確的開關,因爲在生產我有很多刺激用戶,gevent的調度不會爲消費者分配足夠的運行時,並且隊列變得越來越長(這很糟糕)。另外,任何有關如何配置或修改gevent的調度程序的見解都非常感謝。
這是關於Python 2.7.2,gevent 1.0.1和greenlet 0.4.5的。
也許你可以看看隊列的大小,使生產停頓,如果它是具有一定規模的?這聽起來像是潛在的問題是生產者(如果不加控制的話)比消費者可以處理更多的生產方式?你如何使用gevent來做這件事,而不是線程或多重處理? –
我實際上已經實現了你所建議的等待,但是它導致了生產者方面的很多等待導致系統吞吐量降低(並且這個系統確實需要快速完成工作......)。消費者當然可以處理負載,生產者加載和分析URL,所有消費者都會將結果寫入數據庫。我使用gevent是因爲它減少了線程引入的同步頭痛問題,並且由於一次生成幾十個工作進程後多處理會消耗大量內存。 – Simon
我想我在這裏錯過了一些東西。如果生產者能夠生產超過消費者能夠處理的產品,那麼你唯一的選擇就是增加更多的消費者,或者減少生產者生產的數量(這就是等待的)。我不確定減少系統吞吐量的意義 - 不是限制消費者可以如何快速處理隊列的瓶頸?線程引入gevent的同步問題不是什麼? –