2017-02-27 67 views
0

蟒蛇調用ansibleApi與芹菜返回無,我搜索了幾天。它與調用部署函數沒有芹菜,但與芹菜我的代碼調用ansibleApi返回無。django +芹菜+ ansibleApi返回無

重現步驟。

1.tasks.py

from celery import shared_task 
from .deploy_tomcat2 import django_process 


@shared_task 
def deploy(jira_num): 
    #return 'hello world {0}'.format(jira_num) 
    #rdb.set_trace() 
    return django_process(jira_num)  

2.deploy_tomcat2.py

from .AnsibleApi import CallApi 

def django_process(jira_num): 
    server = '10.10.10.30' 
    name = 'abc' 
    port = 11011 
    code = 'efs' 
    jdk = '1.12.13' 
    jvm = 'xxxx' 

    if str.isdigit(jira_num): 
     # import pdb 
     # pdb.set_trace() 
     call = CallApi(server,name,port,code,jdk,jvm) 
     return call.run_task() 

3.AnsibleApi.py

#!/usr/bin/env python 

import logging 
from .Logger import Logger 
from django.conf import settings 
from collections import namedtuple 
from ansible.parsing.dataloader import DataLoader 
from ansible.vars import VariableManager 
from ansible.inventory import Inventory 
from ansible.playbook.play import Play 
from ansible.executor.task_queue_manager import TaskQueueManager 
from ansible.plugins.callback import CallbackBase 

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO) 


class ResultCallback(CallbackBase): 
    def __init__(self, *args, **kwargs): 
     super(ResultCallback ,self).__init__(*args, **kwargs) 
     self.host_ok = {} 
     self.host_unreachable = {} 
     self.host_failed = {} 

    def v2_runner_on_unreachable(self, result): 
     self.host_unreachable[result._host.get_name()] = result 

    def v2_runner_on_ok(self, result, *args, **kwargs): 
     self.host_ok[result._host.get_name()] = result 

    def v2_runner_on_failed(self, result, *args, **kwargs): 
     self.host_failed[result._host.get_name()] = result 


class CallApi(object): 
    user = settings.SSH_USER 
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE 
    results_callback = ResultCallback() 
    Options = namedtuple('Options', 
         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 
          'become_user', 'check']) 

    def __init__(self,ip,name,port,code,jdk,jvm): 
     self.ip = ip 
     self.name = name 
     self.port = port 
     self.code = code 
     self.jdk = jdk 
     self.jvm = jvm 
     self.results_callback = ResultCallback() 
     self.results_raw = {} 

    def _gen_user_task(self): 
     tasks = [] 
     deploy_script = 'autodeploy/tomcat_deploy.sh' 
     dst_script = '/tmp/tomcat_deploy.sh' 
     cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') 
     args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) 
     tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) 
     tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) 
     # tasks.append(dict(action=dict(module='command', args=args))) 
     # tasks.append(dict(action=dict(module='command', args=args), register='result')) 
     # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) 
     self.tasks = tasks 

    def _set_option(self): 
     self._gen_user_task() 

     self.variable_manager = VariableManager() 
     self.loader = DataLoader() 
     self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, 
            become=True, become_method='sudo', become_user='root', check=False) 
     self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) 
     self.variable_manager.set_inventory(self.inventory) 

     play_source = dict(
     name = "auto deploy tomcat", 
      hosts = self.ip, 
      remote_user = self.user, 
      gather_facts='no', 
      tasks = self.tasks 
     ) 
     self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) 

    def run_task(self): 
     self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} 
     tqm = None 
     from celery.contrib import rdb;rdb.set_trace() 
     #import pdb;pdb.set_trace() 
     self._set_option() 
     try: 
      tqm = TaskQueueManager(
       inventory=self.inventory, 
       variable_manager=self.variable_manager, 
       loader=self.loader, 
       options=self.options, 
       passwords=None, 
       stdout_callback=self.results_callback, 
      ) 
      result = tqm.run(self.play) 
     finally: 
      if tqm is not None: 
       tqm.cleanup() 

     for host, result in self.results_callback.host_ok.items(): 
      self.results_raw['success'][host] = result._result 

     for host, result in self.results_callback.host_failed.items(): 
      self.results_raw['failed'][host] = result._result 

     for host, result in self.results_callback.host_unreachable.items(): 
      self.results_raw['unreachable'][host]= result._result 
     Log.info("result is :%s" % self.results_raw) 
     return self.results_raw 

4.celery工人

celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info 

5.produce msg:

deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy') 

回答

0

看起來沒問題。
唯一的問題是None是否真的deploy任務返回?
如果你可以發佈你的芹菜工人日誌會更好。

+0

'[2017年2月27日16:46:08554:INFO/MainProcess]所獲任務:autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5]'' [2017年2月27日16:46:08,786:INFO/PoolWorker-2]結果爲:{'success':{},'failed':{},'unreachable':{}}' '[2017-02-27 16:46: 08,808:INFO/PoolWorker-2]任務autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5]成功0.18285173299955204s:{'success':{},'failed':{},'unreachable':{}}' –

+0

看來你找到了問題'任務不允許啓動子進程',不過不好建議取消這個限制,你可以參考一下[這個討論](https://github.com/celery/celery/issues/1709)好像要自己控制(1,換成線程; 2.daemon = FALSE;其他沒仔細看:)).'TaskQueueManager'沒用過,沒法幫你... – Cheney

+0

這個技能還沒有get,晚點再研究一下。 –

0

有兩種方法來解決這個問題,禁用斷言: 1.where芹菜開始設置出口PYTHONOPTIMIZE = 1或與此參數-O優化 2.停用蟒分組多process.py線102開始芹菜:

assert not _current_process._config.get('daemon'), \ 
       'daemonic processes are not allowed to have children'