2016-06-07 52 views
2

我正在使用Celery調度一些長期任務的Django項目。 Django和Celery都運行在完全獨立的進程中,並需要一種協調對數據庫訪問的方式。我想使用Python的multiprocessing.RLock類(或等價物),因爲我需要該鎖是可重入的。在多個獨立進程中使用Python RLocks

我的問題是,我如何提供訪問單獨進程的RLock?

我發現的兩個最佳解決方案(posix_ipc modulefcntl)僅限於基於Unix的系統,我們希望避免限制自己。

是否有跨平臺的方式來共享流程之間的鎖而不需要共同的祖先進程?

+1

這不是一個直接的答案,但除非你需要*硬*具有較強的順序性鎖定,你可能想看看在諸如[0MQ](http://zeromq.org/)的消息傳遞系統中。一個偉大的消息系統,幾乎可以運行任何東西,並且可以綁定任何語言。 –

+0

對於* 0MQ *爲+1,以便在用各種語言編寫的進程之間進行通信並且延遲很大。我並不習慣Celery和它可能已經涉及(或限制)的東西,但也許你也可以考慮使用['redis'](http://redis.io/topics/distlock),已經有一些python綁定圍繞這種功能(https://pypi.python.org/pypi/python-redis-lock,https://github.com/glasslion/redlock,https://github.com/SPSCommerce/redlock-py,等等) – mgc

+0

你確實意識到這個要求「沒有共同的祖先過程」意味着你不能使用'multiprocessing',對吧? – Louis

回答

0

我結束了使用RabbitMQ作爲創建分佈式鎖的方法。關於如何做到這一點的細節可以在RabbitMQ的博客上找到:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/

簡而言之,您將爲該鎖創建一個RabbitMQ隊列並向其發送一條消息。要獲取鎖定,請在隊列上運行basic_get(非阻塞)或basic_consume(阻塞)。這會從隊列中移除消息,從而阻止其他線程獲取鎖定。一旦你的工作完成,發送一個否定的確認將導致RabbitMQ重新發送消息,允許下一個線程繼續。

不幸的是,這不允許重入鎖。

上面引用的鏈接爲Java代碼提供瞭如何去做這件事。搞清楚如何將它翻譯成Python/Pika令人討厭,我認爲我應該在這裏發佈一些示例代碼。

爲了生產鎖:

import pika 

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection: 
    channel = connection.channel() 
    channel.queue_declare(queue="LockQueue") 
    channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock') 
    channel.close() 

獲取鎖:

import pika 
import time 

def callback(ch, method, properties, body): 
    print("Got lock") 

    for i in range(5, 0, -1): 
     print("Tick {}".format(i)) 
     time.sleep(1) 

    print("Releasing lock") 
    ch.basic_nack(delivery_tag=method.delivery_tag) 
    ch.close() # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock. 

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection: 
    channel = connection.channel() 

    channel.queue_declare(queue='LockQueue') 
    channel.basic_qos(prefetch_count=1) 
    channel.basic_consume(callback, queue='LockQueue') 

    print("Waiting for lock") 
    channel.start_consuming() 
    print("Task completed")