2016-03-24 32 views
1

我想與只能接受單個TCP連接(內存約束)的設備進行接口,因此只需爲每個工作線程啓動連接都不是一個選項,因爲它具有正常客戶端 - 服務器情況,如數據庫連接。在Celery任務中使用多處理併發機制

我已經使用了多重經理字典就是線程之間全局訪問嘗試,格式爲:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

而像這樣一個任務:

from celery import shared_task 
from .celery import manager, clients 

@shared_task 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    # Create client connection if one does not exist. 
    conn = None 
    addr, port = controller 
    if controller not in clients: 
     conn = Client(addr, port) 
     conn.connect() 
     lock = manager.RLock() 
     clients[controller] = (conn, lock,) 
     print("New controller connection to %s:%s" % (addr, port,)) 
    else: 
     conn, lock = clients[controller] 

    try: 
     f = getattr(conn, commandname) # See if connection.commandname() exists. 
    except Exception: 
     raise Exception("command: %s not known." % (commandname)) 

    with lock: 
     res = f(*args) 
     return res 

但是任務會出現序列化錯誤,例如:

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

即使我沒有使用不可序列化的值調用任務,並且任務沒有嘗試返回一個不可序列化的值Celery似乎着迷於試圖序列化這個全局對象?

我缺少什麼?你將如何在Celery任務中使用線程安全的客戶端設備連接並在線程之間訪問?示例代碼?

+0

我不確定這是否適合您的情況,但我只記得閱讀'multiprocessing.reduction',它應該允許在進程之間共享套接字連接。 [見這篇博文的例子](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian

+0

客戶端不使用原始套接字,它是具有協議的Twisted連接對象。使用原始套接字或重新構建fd中的Twisted連接對象並不重要。 –

+0

我最終研究瞭如何在現有套接字周圍包裝一個Twisted協議,但是在我的情況下它不起作用,因爲Celery消費者作爲*工作主進程的獨立子進程*無法訪問需要的文件描述符(存儲在Redis中),並建立unix管道的共享FD的糾紛是太多hackery。 與我的情況有關的問題是設備是內存受限,並且不能只是有多個連接...所以我決定只有一個工作人員與一個消費者和一個設備。不太好! –

回答

0
... 
self._send_bytes(ForkingPickler.dumps(obj)) 
File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps 
cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed 

環顧網絡後,我意識到我可能錯過了追蹤中重要的東西。看了回溯後,我意識到這不是Celery試圖醃製連接對象,而是Multiprocessing.reduction。減量用於連續化並在另一側重新構建。

我有幾個替代方法來解決這個問題 - 但是他們沒有真正做我最初想要的,就是借用客戶端庫連接對象並使用它,這對於多處理和prefork是不可能的。

+0

啊,我想我的回答有點倉促,因爲你想傳遞相同的連接對象。 Mutliprocessing和prefork通常不會很好地處理進程之間的連接和I/O。你通常想建立一個連接post-fork。您是否考慮從prefork切換到eventlet或gevent以實現併發性,然後實現連接池? – antikantian

0

如何使用Redis實現分佈式鎖管理器? Redis python客戶端具有內置的鎖定功能。另請參閱redis.io上的this doc。即使您使用RabbitMQ或其他經紀商,Redis也非常輕巧。

例如,作爲裝飾:

from functools import wraps 

def device_lock(block=True): 
    def decorator(func): 
     @wraps(func) 
     def wrapper(*args, **kwargs): 
      return_value = None 
      have_lock = False 
      lock = redisconn.lock('locks.device', timeout=2, sleep=0.01) 
      try: 
       have_lock = lock.acquire(blocking=block) 
       if have_lock: 
        return_value = func(*args, **kwargs) 
      finally: 
       if have_lock: 
        lock.release() 
      return return_value 
     return wrapper 
    return decorator 

@shared_task 
@device_lock 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    ... 

您還可以使用this approach從芹菜任務食譜:

from celery import task 
from celery.utils.log import get_task_logger 
from django.core.cache import cache 
from hashlib import md5 
from djangofeeds.models import Feed 

logger = get_task_logger(__name__) 

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 

@task(bind=True) 
def import_feed(self, feed_url): 
    # The cache key consists of the task name and the MD5 digest 
    # of the feed URL. 
    feed_url_hexdigest = md5(feed_url).hexdigest() 
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest) 

    # cache.add fails if the key already exists 
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: cache.delete(lock_id) 

    logger.debug('Importing feed: %s', feed_url) 
    if acquire_lock(): 
     try: 
      feed = Feed.objects.import_feed(feed_url) 
     finally: 
      release_lock() 
     return feed.url 

    logger.debug(
     'Feed %s is already being imported by another worker', feed_url) 
+0

我知道這些解決方案 - 但是我沒有使用它的原因是因爲它沒有做我想要的,只是在進程之間共享實際的連接對象並使用已經打開的連接。我試圖避免每次運行任務時斷開連接並重新連接。 我可以保持連接對象爲全局的,如果我用一個單獨的線程運行worker並重用它。我正在考慮爲這些客戶使用一批單一流程的員工。否則,如果我確實選擇僅在每次發送消息時連接,那麼我將使用Redis進行鎖定。在其他解決方案... –

0

你試圖使用GEVENT或eventlet芹菜工人,而不是過程和線程?在這種情況下,您將能夠使用全局變量或threading.local()來共享連接對象。

+0

我正在使用eventlet鎖定。我可以更努力地找出原因,但是由於阻止IO性質,我嘗試做的事情對於eventlet/gevent的事件循環本質是不合適的。 –