Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ansible+celery doesn't work #3859

Closed
2 tasks
chunxiao365 opened this issue Feb 22, 2017 · 3 comments
Closed
2 tasks

ansible+celery doesn't work #3859

chunxiao365 opened this issue Feb 22, 2017 · 3 comments

Comments

@chunxiao365
Copy link

Checklist

  • I have included the output of celery -A proj report in the issue.
    software -> celery:4.0.2 (latentcall) kombu:4.0.2 py:3.6.0
    billiard:3.5.0.2 py-amqp:2.1.4
    platform -> system:Darwin arch:64bit imp:CPython
    loader -> celery.loaders.app.AppLoader
    settings -> transport:amqp results:django-db).
  • I have verified that the issue exists against the master branch of Celery.

Steps to reproduce

1 .deploy_tomcat2.py
`from .AnsibleApi import CallApi

def django_process(jira_num):
server = '10.10.51.30'
name = 'abc'
port = 11011
code = 'efs'
jdk = '1.12.13'
jvm = 'xxxx'

if str.isdigit(jira_num):
    # import pdb
    # pdb.set_trace()
    call = CallApi(server,name,port,code,jdk,jvm)
    return call.run_task()

2. AnsibleApi.py#!/usr/bin/env python

-- coding: utf-8 --

import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)

class ResultCallback(CallbackBase):
def init(self, *args, **kwargs):
super(ResultCallback ,self).init(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}

def v2_runner_on_unreachable(self, result):
    self.host_unreachable[result._host.get_name()] = result

def v2_runner_on_ok(self, result, *args, **kwargs):
    self.host_ok[result._host.get_name()] = result

def v2_runner_on_failed(self, result, *args, **kwargs):
    self.host_failed[result._host.get_name()] = result

class CallApi(object):
user = settings.SSH_USER
ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
results_callback = ResultCallback()
Options = namedtuple('Options',
['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
'become_user', 'check'])

def __init__(self,ip,name,port,code,jdk,jvm):
    self.ip = ip
    self.name = name
    self.port = port
    self.code = code
    self.jdk = jdk
    self.jvm = jvm
    self.results_callback = ResultCallback()
    self.results_raw = {}

def _gen_user_task(self):
    tasks = []
    deploy_script = 'autodeploy/abc.sh'
    dst_script = '/tmp/abc.sh'
    cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
    args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
    tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
    tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
    # tasks.append(dict(action=dict(module='command', args=args)))
    # tasks.append(dict(action=dict(module='command', args=args), register='result'))
    # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
    self.tasks = tasks

def _set_option(self):
    self._gen_user_task()

    self.variable_manager = VariableManager()
    self.loader = DataLoader()
    self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
                                become=True, become_method='sudo', become_user='root', check=False)
    self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
    self.variable_manager.set_inventory(self.inventory)

    play_source = dict(
    name = "auto deploy tomcat",
        hosts = self.ip,
        remote_user = self.user,
        gather_facts='no',
        tasks = self.tasks
    )
    self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

def run_task(self):
    self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
    tqm = None
    from celery.contrib import rdb
    rdb.set_trace()
    self._set_option()
    try:
        tqm = TaskQueueManager(
            inventory=self.inventory,
            variable_manager=self.variable_manager,
            loader=self.loader,
            options=self.options,
            passwords=None,
            stdout_callback=self.results_callback,
        )
        result = tqm.run(self.play)
    finally:
        if tqm is not None:
            tqm.cleanup()

    for host, result in self.results_callback.host_ok.items():
        self.results_raw['success'][host] = result._result

    for host, result in self.results_callback.host_failed.items():
        self.results_raw['failed'][host] = result._result

    for host, result in self.results_callback.host_unreachable.items():
        self.results_raw['unreachable'][host]= result._result
    Log.info("result is :%s" % self.results_raw)
    return self.results_raw

3.tasks.pyfrom future import absolute_import, unicode_literals
from celery import shared_task
from .deploy_tomcat2 import django_process
#rom celery.contrib import rdb

@shared_task
def deploy(num):
#return 'hello world {0}'.format(num)
#rdb.set_trace()

return django_process(num)`

Expected behavior

In django console :python manage.py shell , I can use deploy('123') ,code call ansibleApi to copy abc.sh to dst server sucessfully.
But I didn't work when I used 'deploy.apply_async(args=['1334'], queue='queue.ops.deploy', routing_key='ops.deploy')'

I searched a few day ,still doesn't work out .

Actual behavior

AnsibleApi return empty dict :{'success': {}, 'failed': {}, 'unreachable': {}}

@chunxiao365
Copy link
Author

there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children'

@auvipy auvipy closed this as completed Dec 19, 2017
@xuhuigithub
Copy link

there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children'

@Xuexiang825 Sorry, it doesn't work, I use Python 3.6.3 and ansible 2.4.2.0, celery 4.1.0. My playbook executer display message but do nothing.Furthermore, can you tell me something about parameter -O OPTIMIZATION?

@xuhuigithub
Copy link

@Xuexiang825 我刚刚看了你在这篇文章中的解答,我发现你也是中国人,那我就用中文了,我用了export 变量好使了! 但是-O 参数好像并没有效果,celery worker -A celery_worker.celery --loglevel=info -O OPTIMIZATION 这是我的启动命令。 感觉没什么不对的地方,对吧

@celery celery locked as off-topic and limited conversation to collaborators Apr 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants