2013-02-22 16 views
2

我正在使用rabbitmq的最新pika庫(0.9.9+)。我對rabbitmq和pika的用法如下:Amazon ec2中pika-rabbitmq的心跳間隔很好

  1. 作爲工人,我有很長時間的運行任務(約5分鐘)。這些任務從rabbitmq收到請求。請求很少發生,即請求之間有很長的空閒時間。
  2. 我以前面臨的問題與空閒連接(由於空閒連接引起的連接關閉)有關。所以,我已經啓用了pika的心跳。
  3. 現在選擇心跳是一個問題。 Pika似乎是一個單線程庫,心跳接收和確認恰好在請求時間段之間完成。
  4. 因此,如果心跳間隔小於回調函數用於執行其長時間運算的時間,服務器將不會收到任何心跳確認並關閉連接。
  5. 因此,我假設最小心跳間隔應該是阻塞連接中回調函數的最大計算時間。

什麼可以很好的心跳值Amazon EC2上,以防止它關閉空閒連接?

此外,一些建議使用rabbitmq keepalive(或libkeepalive)來維護tcp連接。我認爲在tcp層管理心跳會好得多,因爲應用程序不需要管理它們。這是真的嗎?與RMQ心跳相比,Keepalive是一種好方法嗎?

我已經看到一些建議使用多個線程和隊列進行長時間運行的任務。但是這是長期運行任務的唯一選擇嗎?在這種情況下必須使用另一個隊列是非常令人失望的。

預先感謝您。我想我已經詳細解釋了這個問題。讓我知道我是否可以提供更多細節。

回答

3

如果你不依賴於使用鼠兔,這thread幫助我達到你想要做的海帶用什麼:

#!/usr/bin/env python 
import time, logging, weakref, eventlet 
from kombu import Connection, Exchange, Queue 
from kombu.utils.debug import setup_logging 
from kombu.common import eventloop 
from eventlet import spawn_after 

eventlet.monkey_patch() 

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' 
       '-35s %(lineno) -5d: %(message)s') 
logging.basicConfig(level=logging.INFO, format=log_format) 
logger = logging.getLogger('job_worker') 
logger.setLevel(logging.INFO) 


def long_running_function(body): 
    time.sleep(300) 

def job_worker(body, message): 
    long_running_function(body) 
    message.ack() 

def monitor_heartbeats(connection, rate=2): 
    """Function to send heartbeat checks to RabbitMQ. This keeps the 
     connection alive over long-running processes.""" 
    if not connection.heartbeat: 
     logger.info("No heartbeat set for connection: %s" % connection.heartbeat) 
     return 
    interval = connection.heartbeat 
    cref = weakref.ref(connection) 
    logger.info("Starting heartbeat monitor.") 

    def heartbeat_check(): 
     conn = cref() 
     if conn is not None and conn.connected: 
      conn.heartbeat_check(rate=rate) 
      logger.info("Ran heartbeat check.") 
      spawn_after(interval, heartbeat_check) 
    return spawn_after(interval, heartbeat_check) 

def main(): 
    setup_logging(loglevel='INFO') 

    # process for heartbeat monitor 
    p = None 

    try: 
     with Connection('amqp://guest:[email protected]:5672//', heartbeat=300) as conn: 
      conn.ensure_connection() 
      monitor_heartbeats(conn) 
      queue = Queue('job_queue', 
          Exchange('job_queue', type='direct'), 
          routing_key='job_queue') 
      logger.info("Starting worker.") 
      with conn.Consumer(queue, callbacks=[job_worker]) as consumer: 
       consumer.qos(prefetch_count=1) 
       for _ in eventloop(conn, timeout=1, ignore_timeouts=True): 
        pass 
    except KeyboardInterrupt: 
     logger.info("Worker was shut down.") 

if __name__ == "__main__": 
    main() 

我剝奪了我的域特定的代碼,但本質上,這是我的框架使用。