2013-11-20 78 views
1

我目前正在開發一個基於django-tenants-schema的Django應用程序。您不需要查看模塊的實際代碼,但其想法是它具有用於定義應用程序租戶使用哪個模式的當前數據庫連接的全局設置,例如,Django芹菜任務保持全局狀態

tenant = tenants_schema.get_tenant() 

和設置

tenants_schema.set_tenant(xxx) 

對於的任務的一些我希望他們記住的實例化過程中選擇當前全球租戶,例如理論上:

class AbstractTask(Task): 
    ''' 
    Run this method before returning the task future 
    ''' 
    def before_submit(self): 
     self.run_args['tenant'] = tenants_schema.get_tenant() 

    ''' 
    This method is run before related .run() task method 
    ''' 
    def before_run(self): 
     tenants_schema.set_tenant(self.run_args['tenant']) 

在芹菜中是否有一種優雅的方式呢?

回答

0

我不確定你這裏的意思是before_submit在任務被客戶端調用之前執行嗎?

在這種情況下,我寧願使用與在這裏的講話:

from contextlib import contextmanager 

@contextmanager 
def set_tenant_db(tenant): 
    prev_tenant = tenants_schema.get_tenant() 
    try: 
     tenants_scheme.set_tenant(tenant) 
     yield 
    finally: 
     tenants_schema.set_tenant(prev_tenant) 


@app.task 
def tenant_task(tenant=None): 
    with set_tenant_db(tenant): 
     do_actions_here() 


tenant_task.delay(tenant=tenants_scheme.get_tenant()) 

當然,你可以創建一個基本的任務自動執行此操作, 可以應用上下文Task.__call__例如,但我m不確定 如果您可以明確使用with語句,那麼可以節省很多。

+0

''before_submit()''必須在task.apply_async()','task()'或當前進程或其他任何啓動任務的其他情況下調用。正如我所看到的那樣:它只是一個過濾器,用於任務'* args,** kwargs',它添加了一個新參數,以便將它傳遞給任務運行器進程。 我需要的是在基本任務類中保存當前全局變量_implicitly_,然後在要運行任務的進程中將其恢復_implicitly_。 –

1

芹菜(截至3.1)有signals你可以掛鉤做到這一點。你可以改變這種被傳入的kwargs,並在另一邊,撤消你改變他們給予實際任務之前:

from celery import shared_task 
from celery.signals import before_task_publish, task_prerun, task_postrun 
from threading import local 

current_tenant = local() 

@before_task_publish.connect 
def add_tenant_to_task(body=None, **unused): 
    body['kwargs']['tenant_middleware.tenant'] = getattr(current_tenant, 'id', None) 
    print 'sending tenant: {t}'.format(t=current_tenant.id) 


@task_prerun.connect 
def extract_tenant_from_task(kwargs=None, **unused): 
    tenant_id = kwargs.pop('tenant_middleware.tenant', None) 
    current_tenant.id = tenant_id 
    print 'current_tenant.id set to {t}'.format(t=tenant_id) 


@task_postrun.connect 
def cleanup_tenant(**kwargs): 
    current_tenant.id = None 
    print 'cleaned current_tenant.id' 


@shared_task 
def get_current_tenant(): 
    # Here is where you would do work that relied on current_tenant.id being set. 
    import time 
    time.sleep(1) 
    return current_tenant.id 

如果你運行的任務(未顯示從工人登錄) :

In [1]: current_tenant.id = 1234; ct = get_current_tenant.delay(); current_tenant.id = 5678; ct.get() 
sending tenant: 1234 
Out[1]: 1234 

In [2]: current_tenant.id 
Out[2]: 5678 

如果沒有發送消息的信號不叫(當你直接調用任務功能,無需delay()apply_async())。如果要過濾任務名稱,則可在before_task_publish信號處理程序中以body['task']的形式提供,和task_postrun處理程序中提供了task對象。

我是芹菜新手,所以我不能確定這是否是在Celery中做「中間件」類型的東西的「幸運」方式,但我認爲它對我很有用。