我有一個django應用程序調用查詢集上的異步任務(使用芹菜)。該任務接受查詢集並根據其中的對象執行一大堆可能需要很長時間的操作。對象可以在查詢集之間共享,因此用戶可以在包含已經運行的對象的查詢集上提交任務,並且該新任務應該只應對尚未運行的對象執行,但等待所有對象完成在它返回之前。在異步任務和redis的django中的線程安全
我的解釋是有點混亂,所以想象一下下面的代碼:
from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while
class LongRunningTask(Task):
def run(self, process_id, *args, **kwargs):
_queryset = InterestingModel.objects.filter(process__id=process_id)
r = redis.Redis()
p = r.pipeline()
run_check_sets = ('run_check', 'objects_already_running')
# There must be a better way to do this:
for o in _queryset.values_list('pk', flat=True):
p.sadd('run_check')
p.sdiff(run_check_sets) # Objects that need to be run
p.sunion(run_check_sets) # Objects that we need to wait for
p.sunionstore('objects_already_running',run_check_sets)
p.delete('run_check')
redis_result = p.execute()
objects_to_run = redis_result[-3]
objects_to_wait_for = redis_result[-2]
if objects_to_run:
i_take_a_while(objects_to_run)
p = r.pipeline()
for o in objects_to_run:
p.srem('objects_already_running', o)
p.execute()
while objects_to_wait_for:
p = r.pipeline()
for o in objects_to_wait_for:
p.sismember('objects_already_running',o)
redis_result = p.execute()
objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
# Probably need to add some sort of timeout here or in redis
sleep(30)
我非常新的Redis,所以我的主要問題是,是否有操縱的Redis來達到同樣的一個更有效的方法結果。更廣泛地說,我想知道Redis是否有必要/正確的方法來處理這個問題。似乎應該有一種更好的方式來將Django模型與Redis進行交互。最後,我想知道這個代碼實際上是否是線程安全的。任何人都可以在我的邏輯中打出任何洞?
任何評論表示讚賞。
我真的結束了這麼做! – Bacon 2011-04-26 15:50:59