2011-10-25 37 views
1

我創建一個任務(通過繼承celery.task.Task),創建一個連接到Twitter的流媒體API。對於Twitter API調用,我正在使用tweepy。正如我從芹菜文檔中讀到的,「任務沒有爲每個請求實例化,而是作爲全局實例在任務註冊表中註冊。」我期待每當我調用apply_async(或延遲)任務時,我將訪問最初實例化但未發生的任務。而是創建自定義任務類的新實例。我需要能夠訪問原始的自定義任務,因爲這是我可以終止由tweepy API調用創建的原始連接的唯一方法。芹菜創建幾個任務的任務

下面是一些一段代碼,如果這將有助於:

from celery import registry 
from celery.task import Task 

class FollowAllTwitterIDs(Task): 
    def __init__(self): 
     # requirements for creation of the customstream 
     # goes here. The CustomStream class is a subclass 
     # of tweepy.streaming.Stream class 

     self._customstream = CustomStream(*args, **kwargs) 

    @property 
    def customstream(self): 
     if self._customstream: 
      # terminate existing connection to Twitter 
      self._customstream.running = False 
     self._customstream = CustomStream(*args, **kwargs) 

    def run(self): 
     self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed() 

     self.customstream.filter(follow=self._to_follow_ids, async=False) 
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name] 

而對於Django的視圖

def connect_to_twitter(request): 
    if request.method == 'POST': 
     do_stuff_here() 
     . 
     . 
     . 

     follow_all_twitterids.apply_async(args=[], kwargs={}) 

    return 

任何幫助,將不勝感激。 :d

編輯:

用於問題的附加上下文,所述CustomStream對象創建每當所述過濾器()方法被調用的httplib.HTTPSConnection實例。無論何時嘗試創建一個連接,都需要關閉此連接。通過將customstream.running設置爲False來關閉連接。

回答

0

的任務應該只被實例化一次,如果你認爲這是不是出於某種原因, 我建議你添加一個

打印(「實例」) 進口回溯 traceback.print_stack()

Task.__init__方法,所以你可以知道這將發生什麼。

我覺得你的任務,可以更好地這樣表示:

from celery.task import Task, task 

class TwitterTask(Task): 
    _stream = None 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     try: 
      return super(TwitterTask, self).__call__(stream, *args, **kwargs) 
     finally: 
      if self._stream: 
       self._stream.running = False 

    @property 
    def stream(self): 
     if self._stream is None: 
      self._stream = CustomStream() 
     return self._stream 

@task(base=TwitterTask) 
def follow_all_ids(): 
    ids = get_list_of_ids_to_follow() 
    follow_all_ids.stream.filter(follow=ids, async=false) 
+0

感謝您的答覆。我試着實現上述,只是想知道celery.utils.cached_property用在哪裏? – Christian

+0

它只是錯誤地添加在那裏:)我會刪除它 – asksol