我想與只能接受單個TCP連接(內存約束)的設備進行接口,因此只需爲每個工作線程啓動連接都不是一個選項,因爲它具有正常客戶端 - 服務器情況,如數據庫連接。在Celery任務中使用多處理併發機制
我已經使用了多重經理字典就是線程之間全局訪問嘗試,格式爲:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
而像這樣一個任務:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
但是任務會出現序列化錯誤,例如:
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
即使我沒有使用不可序列化的值調用任務,並且任務沒有嘗試返回一個不可序列化的值Celery似乎着迷於試圖序列化這個全局對象?
我缺少什麼?你將如何在Celery任務中使用線程安全的客戶端設備連接並在線程之間訪問?示例代碼?
我不確定這是否適合您的情況,但我只記得閱讀'multiprocessing.reduction',它應該允許在進程之間共享套接字連接。 [見這篇博文的例子](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian
客戶端不使用原始套接字,它是具有協議的Twisted連接對象。使用原始套接字或重新構建fd中的Twisted連接對象並不重要。 –
我最終研究瞭如何在現有套接字周圍包裝一個Twisted協議,但是在我的情況下它不起作用,因爲Celery消費者作爲*工作主進程的獨立子進程*無法訪問需要的文件描述符(存儲在Redis中),並建立unix管道的共享FD的糾紛是太多hackery。 與我的情況有關的問題是設備是內存受限,並且不能只是有多個連接...所以我決定只有一個工作人員與一個消費者和一個設備。不太好! –