2016-06-16 80 views
3

對於服務器自動化,我們正在嘗試開發一種工具,該工具可以處理和執行不同服務器上的大量任務。我們將任務和服務器主機名發送到隊列中。隊列然後從請求者消耗,這會將信息提供給可靠的api。爲了實現這一點,我們可以一次執行多個任務,我們使用線程。Pika:即使最後一條消息未得到確認,也可以使用下一條消息

現在,我們就完蛋了該消息的確認......

我們迄今所做的:
requester.py消耗隊列,然後啓動一個線程,其中ansible任務運行。結果然後發送到另一個隊列。所以每條新消息都會創建一個新線程。任務完成,線程死亡。
但現在來了困難的部分。我們必須使這些信息持續存在,以防我們的服務器死亡。所以每一條消息都應該得到確認之後從負責人的結果發回。

現在我們的問題是,當我們嘗試確認線程本身中的消息時,沒有更多的「同時」工作完成,因爲pika的consume等待確認。那麼我們如何實現,即consume是否會消耗消息而不等待確認?或者我們如何解決或改進我們的小程序?

requester.py

#!/bin/python 

    from worker import * 
    import ansible.inventory 
    import ansible.runner 
    import threading 

    class Requester(Worker): 
     def __init__(self): 
      Worker.__init__(self) 
      self.connection(self.selfhost, self.from_db) 
      self.receive(self.from_db) 

     def send(self, result, ch, method): 
      self.channel.basic_publish(exchange='', 
            routing_key=self.to_db, 
            body=result, 
            properties=pika.BasicProperties(
                delivery_mode=2, 
            )) 

      print "[x] Sent \n" + result 
      ch.basic_ack(delivery_tag = method.delivery_tag) 

     def callAnsible(self, cmd, ch, method): 
      #call ansible api pre 2.0 

      result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': ')) 
      self.send(result, ch, method) 

     def callback(self, ch, method, properties, body): 
      print(" [x] Received by requester %r" % body) 
      t = threading.Thread(target=self.callAnsible, args=(body,ch,method,)) 
      t.start() 

worker.py

import pika 
    import ConfigParser 
    import json 
    import os 

    class Worker(object): 
     def __init__(self): 
      #read some config files 

     def callback(self, ch, method, properties, body): 
      raise Exception("Call method in subclass") 

     def receive(self, queue): 
      self.channel.basic_qos(prefetch_count=1) 
      self.channel.basic_consume(self.callback,queue=queue) 
      self.channel.start_consuming() 

     def connection(self,server,queue): 
      self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host=server, 
       credentials=self.credentials)) 
      self.channel = self.connection.channel() 
      self.channel.queue_declare(queue=queue, durable=True) 

我們正在與Python 2.7和鼠兔0.10.0工作。

是的,我們注意到在pika常見問題解答中:http://pika.readthedocs.io/en/0.10.0/faq.html
pika不是線程安全的。

回答

3

禁用自動確認並將預取計數設置爲大於1的值,具體取決於您希望消費者使用多少條消息。如何設置預取 channel.basic_qos(prefetch_count=1),找到here

+0

太棒了!謝謝!我怎樣才能解鎖這個預取計數。這做到了所有的魔法。 – Rumpli

+0

@Rumpli我已經添加了這個答案。現在我要在這裏自己受到傷害,但是因爲你是新來者,我會在短時間內解釋提升和接受的答案:如果答案對你有幫助,可以投票贊成。如果它解決了您的問題,請立即投訴並接受。在這裏,你只接受沒有upvote,但你沒有嘗試它,如果它的工作:)也許只是upvote現在,並且一旦你確認接受。有人請糾正我,如果我沒有解釋這個投票/接受正確。 – cantSleepNow

+0

感謝您的解釋。我嘗試了這一點,並將'channel.basic_qos(prefetch_count = 1)'設置爲更多然後'1',它在當時做了多個任務。而我試圖upvote你的答案,但只要我沒有15聲望,它不會顯示它...... :( – Rumpli

相關問題