我創建一個任務(通過繼承celery.task.Task),創建一個連接到Twitter的流媒體API。對於Twitter API調用,我正在使用tweepy。正如我從芹菜文檔中讀到的,「任務沒有爲每個請求實例化,而是作爲全局實例在任務註冊表中註冊。」我期待每當我調用apply_async(或延遲)任務時,我將訪問最初實例化但未發生的任務。而是創建自定義任務類的新實例。我需要能夠訪問原始的自定義任務,因爲這是我可以終止由tweepy API調用創建的原始連接的唯一方法。芹菜創建幾個任務的任務
下面是一些一段代碼,如果這將有助於:
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
而對於Django的視圖
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
任何幫助,將不勝感激。 :d
編輯:
用於問題的附加上下文,所述CustomStream對象創建每當所述過濾器()方法被調用的httplib.HTTPSConnection實例。無論何時嘗試創建一個連接,都需要關閉此連接。通過將customstream.running設置爲False來關閉連接。
感謝您的答覆。我試着實現上述,只是想知道celery.utils.cached_property用在哪裏? – Christian
它只是錯誤地添加在那裏:)我會刪除它 – asksol