2014-11-22 52 views
0

我有3個芹菜節拍實例在我的VPS上運行(使用不同的settings.py)。其中三個實例由三個具有相同代碼的網站使用。 該任務基本上發送電子郵件給幾百註冊用戶(使用sendgrid)。芹菜任務重複問題

我的問題是,我的任務運行時使用ETA方法運行3次如下。

sdate = datetime.datetime.strptime(request.POST['schedule_date'],'%d-%m-%Y %H:%M') 
       tz=get_current_timezone() 
    celery_scheduled_campaign.apply_async(eta=tz.localize(sdate), 
        kwargs={'schedule_id': schedule.id }) 

但運行正常(僅一次)使用.delay方法時。

celery_sendmail_task.delay(pro_campaign,unsubscribe_url,ecm_host) 

settings_one.py

... 
BROKER_URL = 'redis://localhost:6379/0' 
... 

settings_two.py

... 
BROKER_URL = 'redis://localhost:6379/1' 
... 

settings_three.py

... 
BROKER_URL = 'redis://localhost:6379/2' 
... 

task.py

from celery import task 
from bulkmailer import send_email 
from models import CampaignSchedule, SendgridEmailQuota 
import logging 
logger = logging.getLogger("ecm_console") 
#import pdb 
#import time 
#from django.core.mail import EmailMultiAlternatives 

@task.task(ignore_result=True) 
def celery_sendmail_task(obj,unsubscribe_url,host): 
    #time.sleep(10) 
    send_email(obj,unsubscribe_url,host) 
    obj.status=True 
    if obj.campaign_opt=='S': 
     obj.campaign_opt='R' 
    obj.save() 

@task.task(ignore_result=True) 
def sendgrid_quota_reset(): 
    try: 
     quota = SendgridEmailQuota.objects.get(pk=1) 
     quota.used=0 
     quota.save() 
     logger.info("Success : sendgrid_quota_reset job ") 
    except Exception, e: 
     logger.error("Critical Error : sendgrid_quota_reset: {0} ".format(e)) 

@task.task(ignore_result=True) 
def celery_scheduled_campaign(schedule_id): 
    try: 
     obj = CampaignSchedule.objects.get(pk=schedule_id) 
     send_email(obj.campaign, obj.unsub_url, obj.ecm_host) 
     obj.campaign.status = True 
     obj.campaign.save() 
    except Exception, e: 
     logger.error("Critical Error : celery_scheduled_campaign: {0} ".format(e)) 

用於運行芹菜命令

蟒manage.py芹菜工人-B -c 2 --loglevel =信息--settings = ecm.settings_one

蟒蛇manage.py芹菜工人-B -c 2 --loglevel =信息--settings = ecm.settings_two

蟒蛇管理。 PY芹菜工人-B -c 2 --loglevel =信息--settings = ecm.settings_three

版本

芹菜== 3.0.21 Django的芹菜== 3.0.21 Python 2.7版。 3

EDIT 1 芹菜日誌顯示任務

[2014-11-24 22:09:32,521: INFO/MainProcess] Celerybeat: Shutting down... 
[2014-11-24 22:09:32,557: WARNING/MainProcess] Restoring 1 unacknowledged message(s). 
[2014-11-24 22:09:40,495: INFO/Beat] Celerybeat: Starting... 
[2014-11-24 22:09:40,540: WARNING/MainProcess] [email protected] ready. 
[2014-11-24 22:09:40,547: INFO/MainProcess] consumer: Connected to redis://localhost:6379/3. 
[2014-11-24 22:09:40,614: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 

^^氏後自動添加幾個小時s是我從前端添加任務的地方。下面的任務是獲取自動添加

[2014-11-24 23:09:53,039: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 

週期性任務沒有ETA正常VV

[2014-11-25 00:01:00,044: INFO/Beat] Scheduler: Sending due task ecm_sendgrid_sync (ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync) 
[2014-11-25 00:01:00,052: INFO/MainProcess] Got task from broker: ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8] 
[2014-11-25 00:01:02,262: INFO/MainProcess] Success : update job 
[2014-11-25 00:01:02,265: INFO/MainProcess] Task ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8] succeeded in 2.18759179115s: None 

再次任務,得到ETA自動添加運行。注意哈希是相同的。

[2014-11-25 00:10:12,190: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 01:10:26,029: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 02:10:39,025: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 03:10:50,063: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 04:00:00,007: INFO/Beat] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup) 
[2014-11-25 04:00:00,064: INFO/MainProcess] Got task from broker: celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] expires:[2014-11-25 16:00:00.008296+04:00] 
[2014-11-25 04:00:01,533: INFO/MainProcess] Task celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] succeeded in 1.01458001137s: None 
[2014-11-25 04:11:03,062: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 05:11:15,073: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 06:11:26,101: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 07:11:38,324: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 
[2014-11-25 08:11:53,097: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00] 

這可能是舊版本中的錯誤。我也懷疑我的VPS,內存不足(使用400 +/489)

+1

可能是因爲所有(3)任務的消息都去同一個端口號?這是導致tripple任務運行? – doniyor 2014-11-22 14:06:04

+0

@ doniyor如何檢查? – 2014-11-23 15:24:26

+0

只是改變端口號,看看它的工作原理,讓我知道如果它確實;) – doniyor 2014-11-23 16:02:18

回答

0

最後做了一個修復。增加了鎖定機制以確保任務只執行一次。更多詳情here

task.py

# ... 
import redis 
@task.task(ignore_result=True) 
def celery_scheduled_campaign(schedule_id): 
    LOCK_EXPIRE = 60 * 30 # Lock expires in 30 minutes 
    obj = campaign.objects.get(pk=schedule_id) 
    my_lock = redis.Redis().lock(obj.campaign_uuid,timeout=LOCK_EXPIRE) 
    if my_lock.acquire(blocking=False) and obj.is_complete == False: 
     #... 
     # Task to run 
     #... 
     obj.is_complete = True 
     my_lock.release() 

models.py

# ... 
import uuid 
class campaign(models.Model): 
    # ... 
    campaign_uuid = models.CharField(editable=False, max_length=100) 
    is_complete = models.BooleanField(default=False) 
    # ... 
    def save(self, *args, **kwargs): 
      if not self.id: 
       self.campaign_uuid = str(uuid.uuid4()) 
      super(campaign, self).save(*args, **kwargs) 
1

確保所有3條消息都不會導致同一個端口導致同一端口上有多個芹菜實例。