Save the command line arguments into a global context

* Once cli args are parsed, they're constant.  So, save the parsed args
  into the global context for everyone else to use them from now on.
* Port cli scripts to use the CLIARGS in the context
* Refactor call to parse cli args into the run() method
* Fix unittests for changes to the internals of CLI arg parsing
* Port callback plugins to use context.CLIARGS
  * Got rid of the private self._options attribute
  * Use context.CLIARGS in the individual callback plugins instead.
  * Also output positional arguments in default and unixy plugins
  * Code has been simplified since we're now dealing with a dict rather
    than Optparse.Value
pull/50537/head
Toshio Kuratomi 6 years ago
parent c18da65089
commit afdbb0d9d5

@ -29,6 +29,7 @@ import shutil
import sys
import traceback
from ansible import context
from ansible.errors import AnsibleError, AnsibleOptionsError, AnsibleParserError
from ansible.module_utils._text import to_text
@ -106,7 +107,6 @@ if __name__ == '__main__':
exit_code = 6
else:
cli = mycli(args)
cli.parse()
exit_code = cli.run()
except AnsibleOptionsError as e:
@ -134,9 +134,9 @@ if __name__ == '__main__':
# Show raw stacktraces in debug mode, It also allow pdb to
# enter post mortem mode.
raise
have_cli_options = cli is not None and cli.options is not None
have_cli_options = bool(context.CLIARGS)
display.error("Unexpected Exception, this is probably a bug: %s" % to_text(e), wrap_text=False)
if not have_cli_options or have_cli_options and cli.options.verbosity > 2:
if not have_cli_options or have_cli_options and context.CLIARGS['verbosity'] > 2:
log_only = False
if hasattr(e, 'orig_exc'):
display.vvv('\nexception type: %s' % to_text(type(e.orig_exc)))

@ -0,0 +1,6 @@
---
minor_changes:
- Refactored the CLI code to parse the CLI arguments and then save them into
a non-mutatable global singleton. This should make it easier to modify.
- Removed the private ``_options`` attribute of ``CallbackBase``. See the porting
guide if you need access to the command line arguments in a callback plugin.

@ -148,6 +148,21 @@ Plugins
* Order of enabled inventory plugins (:ref:`INVENTORY_ENABLED`) has been updated, :ref:`auto <auto_inventory>` is now before :ref:`yaml <yaml_inventory>` and :ref:`ini <ini_inventory>`.
* The private ``_options`` attribute has been removed from the ``CallbackBase`` class of callback
plugins. If you have a third-party callback plugin which needs to access the command line arguments,
use code like the following instead of trying to use ``self._options``:
.. code-block:: python
from ansible import context
[...]
tags = context.CLIARGS['tags']
``context.CLIARGS`` is a read-only dictionary so normal dictionary retrieval methods like
``CLIARGS.get('tags')`` and ``CLIARGS['tags']`` work as expected but you won't be able to modify
the cli arguments at all.
Porting custom scripts
======================

@ -1,20 +1,7 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright: (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# Copyright: (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
@ -34,6 +21,7 @@ from abc import ABCMeta, abstractmethod
import ansible
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleOptionsError, AnsibleError
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.six import with_metaclass, string_types
@ -46,6 +34,7 @@ from ansible.utils.vars import load_extra_vars, load_options_vars
from ansible.vars.manager import VariableManager
from ansible.parsing.vault import PromptVaultSecret, get_file_vault_secret
display = Display()
@ -93,6 +82,156 @@ class InvalidOptsParser(SortedOptParser):
pass
def base_parser(usage="", output_opts=False, runas_opts=False, meta_opts=False, runtask_opts=False,
vault_opts=False, module_opts=False, async_opts=False, connect_opts=False,
subset_opts=False, check_opts=False, inventory_opts=False, epilog=None,
fork_opts=False, runas_prompt_opts=False, desc=None, basedir_opts=False,
vault_rekey_opts=False):
"""
Create an options parser for most ansible scripts
"""
# base opts
parser = SortedOptParser(usage, version=CLI.version("%prog"), description=desc, epilog=epilog)
parser.remove_option('--version')
version_help = "show program's version number, config file location, configured module search path," \
" module location, executable location and exit"
parser.add_option('--version', action="version", help=version_help)
parser.add_option('-v', '--verbose', dest='verbosity', default=C.DEFAULT_VERBOSITY, action="count",
help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
if inventory_opts:
parser.add_option('-i', '--inventory', '--inventory-file', dest='inventory', action="append",
help="specify inventory host path or comma separated host list. --inventory-file is deprecated")
parser.add_option('--list-hosts', dest='listhosts', action='store_true',
help='outputs a list of matching hosts; does not execute anything else')
parser.add_option('-l', '--limit', default=C.DEFAULT_SUBSET, dest='subset',
help='further limit selected hosts to an additional pattern')
if module_opts:
parser.add_option('-M', '--module-path', dest='module_path', default=None,
help="prepend colon-separated path(s) to module library (default=%s)" % C.DEFAULT_MODULE_PATH,
action="callback", callback=CLI.unfrack_paths, type='str')
if runtask_opts:
parser.add_option('-e', '--extra-vars', dest="extra_vars", action="append",
help="set additional variables as key=value or YAML/JSON, if filename prepend with @", default=[])
if fork_opts:
parser.add_option('-f', '--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
if vault_opts:
parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
parser.add_option('--vault-password-file', default=[], dest='vault_password_files',
help="vault password file", action="callback", callback=CLI.unfrack_paths, type='string')
parser.add_option('--vault-id', default=[], dest='vault_ids', action='append', type='string',
help='the vault identity to use')
if vault_rekey_opts:
parser.add_option('--new-vault-password-file', default=None, dest='new_vault_password_file',
help="new vault password file for rekey", action="callback", callback=CLI.unfrack_path, type='string')
parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string',
help='the new vault identity to use for rekey')
if subset_opts:
parser.add_option('-t', '--tags', dest='tags', default=C.TAGS_RUN, action='append',
help="only run plays and tasks tagged with these values")
parser.add_option('--skip-tags', dest='skip_tags', default=C.TAGS_SKIP, action='append',
help="only run plays and tasks whose tags do not match these values")
if output_opts:
parser.add_option('-o', '--one-line', dest='one_line', action='store_true',
help='condense output')
parser.add_option('-t', '--tree', dest='tree', default=None,
help='log output to this directory')
if connect_opts:
connect_group = optparse.OptionGroup(parser, "Connection Options", "control as whom and how to connect to hosts")
connect_group.add_option('-k', '--ask-pass', default=C.DEFAULT_ASK_PASS, dest='ask_pass', action='store_true',
help='ask for connection password')
connect_group.add_option('--private-key', '--key-file', default=C.DEFAULT_PRIVATE_KEY_FILE, dest='private_key_file',
help='use this file to authenticate the connection', action="callback", callback=CLI.unfrack_path, type='string')
connect_group.add_option('-u', '--user', default=C.DEFAULT_REMOTE_USER, dest='remote_user',
help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER)
connect_group.add_option('-c', '--connection', dest='connection', default=C.DEFAULT_TRANSPORT,
help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT)
connect_group.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', dest='timeout',
help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT)
connect_group.add_option('--ssh-common-args', default='', dest='ssh_common_args',
help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)")
connect_group.add_option('--sftp-extra-args', default='', dest='sftp_extra_args',
help="specify extra arguments to pass to sftp only (e.g. -f, -l)")
connect_group.add_option('--scp-extra-args', default='', dest='scp_extra_args',
help="specify extra arguments to pass to scp only (e.g. -l)")
connect_group.add_option('--ssh-extra-args', default='', dest='ssh_extra_args',
help="specify extra arguments to pass to ssh only (e.g. -R)")
parser.add_option_group(connect_group)
runas_group = None
rg = optparse.OptionGroup(parser, "Privilege Escalation Options", "control how and which user you become as on target hosts")
if runas_opts:
runas_group = rg
# priv user defaults to root later on to enable detecting when this option was given here
runas_group.add_option("-s", "--sudo", default=C.DEFAULT_SUDO, action="store_true", dest='sudo',
help="run operations with sudo (nopasswd) (deprecated, use become)")
runas_group.add_option('-U', '--sudo-user', dest='sudo_user', default=None,
help='desired sudo user (default=root) (deprecated, use become)')
runas_group.add_option('-S', '--su', default=C.DEFAULT_SU, action='store_true',
help='run operations with su (deprecated, use become)')
runas_group.add_option('-R', '--su-user', default=None,
help='run operations with su as this user (default=%s) (deprecated, use become)' % C.DEFAULT_SU_USER)
# consolidated privilege escalation (become)
runas_group.add_option("-b", "--become", default=C.DEFAULT_BECOME, action="store_true", dest='become',
help="run operations with become (does not imply password prompting)")
runas_group.add_option('--become-method', dest='become_method', default=C.DEFAULT_BECOME_METHOD, type='choice', choices=C.BECOME_METHODS,
help="privilege escalation method to use (default=%s), valid choices: [ %s ]" %
(C.DEFAULT_BECOME_METHOD, ' | '.join(C.BECOME_METHODS)))
runas_group.add_option('--become-user', default=None, dest='become_user', type='string',
help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
if runas_opts or runas_prompt_opts:
if not runas_group:
runas_group = rg
runas_group.add_option('--ask-sudo-pass', default=C.DEFAULT_ASK_SUDO_PASS, dest='ask_sudo_pass', action='store_true',
help='ask for sudo password (deprecated, use become)')
runas_group.add_option('--ask-su-pass', default=C.DEFAULT_ASK_SU_PASS, dest='ask_su_pass', action='store_true',
help='ask for su password (deprecated, use become)')
runas_group.add_option('-K', '--ask-become-pass', default=False, dest='become_ask_pass', action='store_true',
help='ask for privilege escalation password')
if runas_group:
parser.add_option_group(runas_group)
if async_opts:
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', dest='poll_interval',
help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL)
parser.add_option('-B', '--background', dest='seconds', type='int', default=0,
help='run asynchronously, failing after X seconds (default=N/A)')
if check_opts:
parser.add_option("-C", "--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
parser.add_option('--syntax-check', dest='syntax', action='store_true',
help="perform a syntax check on the playbook, but do not execute it")
parser.add_option("-D", "--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
if meta_opts:
parser.add_option('--force-handlers', default=C.DEFAULT_FORCE_HANDLERS, dest='force_handlers', action='store_true',
help="run handlers even if a task fails")
parser.add_option('--flush-cache', dest='flush_cache', action='store_true',
help="clear the fact cache for every host in inventory")
if basedir_opts:
parser.add_option('--playbook-dir', default=None, dest='basedir', action='store',
help="Since this tool does not use playbooks, use this as a subsitute playbook directory."
"This sets the relative path for many features including roles/ group_vars/ etc.")
return parser
class CLI(with_metaclass(ABCMeta, object)):
''' code behind bin/ansible* programs '''
@ -117,7 +256,6 @@ class CLI(with_metaclass(ABCMeta, object)):
"""
self.args = args
self.options = None
self.parser = None
self.action = None
self.callback = callback
@ -158,6 +296,7 @@ class CLI(with_metaclass(ABCMeta, object)):
Subclasses must implement this method. It does the actual work of
running an Ansible command.
"""
self.parse()
display.vv(to_text(self.parser.get_version()))
@ -308,15 +447,15 @@ class CLI(with_metaclass(ABCMeta, object)):
def ask_passwords(self):
''' prompt for connection and become passwords if needed '''
op = self.options
op = context.CLIARGS
sshpass = None
becomepass = None
become_prompt = ''
become_prompt_method = "BECOME" if C.AGNOSTIC_BECOME_PROMPT else op.become_method.upper()
become_prompt_method = "BECOME" if C.AGNOSTIC_BECOME_PROMPT else op['become_method'].upper()
try:
if op.ask_pass:
if op['ask_pass']:
sshpass = getpass.getpass(prompt="SSH password: ")
become_prompt = "%s password[defaults to SSH password]: " % become_prompt_method
if sshpass:
@ -324,9 +463,9 @@ class CLI(with_metaclass(ABCMeta, object)):
else:
become_prompt = "%s password: " % become_prompt_method
if op.become_ask_pass:
if op['become_ask_pass']:
becomepass = getpass.getpass(prompt=become_prompt)
if op.ask_pass and becomepass == '':
if op['ask_pass'] and becomepass == '':
becomepass = sshpass
if becomepass:
becomepass = to_bytes(becomepass)
@ -335,43 +474,46 @@ class CLI(with_metaclass(ABCMeta, object)):
return (sshpass, becomepass)
def normalize_become_options(self):
''' this keeps backwards compatibility with sudo/su self.options '''
self.options.become_ask_pass = self.options.become_ask_pass or self.options.ask_sudo_pass or self.options.ask_su_pass or C.DEFAULT_BECOME_ASK_PASS
self.options.become_user = self.options.become_user or self.options.sudo_user or self.options.su_user or C.DEFAULT_BECOME_USER
@staticmethod
def normalize_become_options(options):
''' this keeps backwards compatibility with sudo/su command line options '''
if not options.become_ask_pass:
options.become_ask_pass = options.ask_sudo_pass or options.ask_su_pass or C.DEFAULT_BECOME_ASK_PASS
if not options.become_user:
options.become_user = options.sudo_user or options.su_user or C.DEFAULT_BECOME_USER
def _dep(which):
display.deprecated('The %s command line option has been deprecated in favor of the "become" command line arguments' % which, '2.9')
if self.options.become:
if options.become:
pass
elif self.options.sudo:
self.options.become = True
self.options.become_method = 'sudo'
elif options.sudo:
options.become = True
options.become_method = 'sudo'
_dep('sudo')
elif self.options.su:
self.options.become = True
self.options.become_method = 'su'
elif options.su:
options.become = True
options.become_method = 'su'
_dep('su')
# other deprecations:
if self.options.ask_sudo_pass or self.options.sudo_user:
if options.ask_sudo_pass or options.sudo_user:
_dep('sudo')
if self.options.ask_su_pass or self.options.su_user:
if options.ask_su_pass or options.su_user:
_dep('su')
def validate_conflicts(self, vault_opts=False, runas_opts=False, fork_opts=False, vault_rekey_opts=False):
''' check for conflicting options '''
return options
op = self.options
def validate_conflicts(self, op, vault_opts=False, runas_opts=False, fork_opts=False, vault_rekey_opts=False):
''' check for conflicting options '''
if vault_opts:
# Check for vault related conflicts
if (op.ask_vault_pass and op.vault_password_files):
if op.ask_vault_pass and op.vault_password_files:
self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive")
if vault_rekey_opts:
if (op.new_vault_id and op.new_vault_password_file):
if op.new_vault_id and op.new_vault_password_file:
self.parser.error("--new-vault-password-file and --new-vault-id are mutually exclusive")
if runas_opts:
@ -380,13 +522,17 @@ class CLI(with_metaclass(ABCMeta, object)):
(op.su or op.su_user) and (op.become or op.become_user) or
(op.sudo or op.sudo_user) and (op.become or op.become_user)):
self.parser.error("Sudo arguments ('--sudo', '--sudo-user', and '--ask-sudo-pass') and su arguments ('--su', '--su-user', and '--ask-su-pass') "
"and become arguments ('--become', '--become-user', and '--ask-become-pass') are exclusive of each other")
self.parser.error("Sudo arguments ('--sudo', '--sudo-user', and '--ask-sudo-pass')"
" and su arguments ('--su', '--su-user', and '--ask-su-pass')"
" and become arguments ('--become', '--become-user', and"
" '--ask-become-pass') are exclusive of each other")
if fork_opts:
if op.forks < 1:
self.parser.error("The number of processes (--forks) must be >= 1")
return op
@staticmethod
def unfrack_paths(option, opt, value, parser):
paths = getattr(parser.values, option.dest)
@ -409,208 +555,104 @@ class CLI(with_metaclass(ABCMeta, object)):
else:
setattr(parser.values, option.dest, value)
@staticmethod
def base_parser(usage="", output_opts=False, runas_opts=False, meta_opts=False, runtask_opts=False, vault_opts=False, module_opts=False,
async_opts=False, connect_opts=False, subset_opts=False, check_opts=False, inventory_opts=False, epilog=None, fork_opts=False,
runas_prompt_opts=False, desc=None, basedir_opts=False, vault_rekey_opts=False):
''' create an options parser for most ansible scripts '''
# base opts
parser = SortedOptParser(usage, version=CLI.version("%prog"), description=desc, epilog=epilog)
parser.remove_option('--version')
version_help = "show program's version number, config file location, configured module search path," \
" module location, executable location and exit"
parser.add_option('--version', action="version", help=version_help)
parser.add_option('-v', '--verbose', dest='verbosity', default=C.DEFAULT_VERBOSITY, action="count",
help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
if inventory_opts:
parser.add_option('-i', '--inventory', '--inventory-file', dest='inventory', action="append",
help="specify inventory host path or comma separated host list. --inventory-file is deprecated")
parser.add_option('--list-hosts', dest='listhosts', action='store_true',
help='outputs a list of matching hosts; does not execute anything else')
parser.add_option('-l', '--limit', default=C.DEFAULT_SUBSET, dest='subset',
help='further limit selected hosts to an additional pattern')
if module_opts:
parser.add_option('-M', '--module-path', dest='module_path', default=None,
help="prepend colon-separated path(s) to module library (default=%s)" % C.DEFAULT_MODULE_PATH,
action="callback", callback=CLI.unfrack_paths, type='str')
if runtask_opts:
parser.add_option('-e', '--extra-vars', dest="extra_vars", action="append",
help="set additional variables as key=value or YAML/JSON, if filename prepend with @", default=[])
if fork_opts:
parser.add_option('-f', '--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
if vault_opts:
parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
parser.add_option('--vault-password-file', default=[], dest='vault_password_files',
help="vault password file", action="callback", callback=CLI.unfrack_paths, type='string')
parser.add_option('--vault-id', default=[], dest='vault_ids', action='append', type='string',
help='the vault identity to use')
if vault_rekey_opts:
parser.add_option('--new-vault-password-file', default=None, dest='new_vault_password_file',
help="new vault password file for rekey", action="callback", callback=CLI.unfrack_path, type='string')
parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string',
help='the new vault identity to use for rekey')
if subset_opts:
parser.add_option('-t', '--tags', dest='tags', default=C.TAGS_RUN, action='append',
help="only run plays and tasks tagged with these values")
parser.add_option('--skip-tags', dest='skip_tags', default=C.TAGS_SKIP, action='append',
help="only run plays and tasks whose tags do not match these values")
if output_opts:
parser.add_option('-o', '--one-line', dest='one_line', action='store_true',
help='condense output')
parser.add_option('-t', '--tree', dest='tree', default=None,
help='log output to this directory')
if connect_opts:
connect_group = optparse.OptionGroup(parser, "Connection Options", "control as whom and how to connect to hosts")
connect_group.add_option('-k', '--ask-pass', default=C.DEFAULT_ASK_PASS, dest='ask_pass', action='store_true',
help='ask for connection password')
connect_group.add_option('--private-key', '--key-file', default=C.DEFAULT_PRIVATE_KEY_FILE, dest='private_key_file',
help='use this file to authenticate the connection', action="callback", callback=CLI.unfrack_path, type='string')
connect_group.add_option('-u', '--user', default=C.DEFAULT_REMOTE_USER, dest='remote_user',
help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER)
connect_group.add_option('-c', '--connection', dest='connection', default=C.DEFAULT_TRANSPORT,
help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT)
connect_group.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', dest='timeout',
help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT)
connect_group.add_option('--ssh-common-args', default='', dest='ssh_common_args',
help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)")
connect_group.add_option('--sftp-extra-args', default='', dest='sftp_extra_args',
help="specify extra arguments to pass to sftp only (e.g. -f, -l)")
connect_group.add_option('--scp-extra-args', default='', dest='scp_extra_args',
help="specify extra arguments to pass to scp only (e.g. -l)")
connect_group.add_option('--ssh-extra-args', default='', dest='ssh_extra_args',
help="specify extra arguments to pass to ssh only (e.g. -R)")
parser.add_option_group(connect_group)
runas_group = None
rg = optparse.OptionGroup(parser, "Privilege Escalation Options", "control how and which user you become as on target hosts")
if runas_opts:
runas_group = rg
# priv user defaults to root later on to enable detecting when this option was given here
runas_group.add_option("-s", "--sudo", default=C.DEFAULT_SUDO, action="store_true", dest='sudo',
help="run operations with sudo (nopasswd) (deprecated, use become)")
runas_group.add_option('-U', '--sudo-user', dest='sudo_user', default=None,
help='desired sudo user (default=root) (deprecated, use become)')
runas_group.add_option('-S', '--su', default=C.DEFAULT_SU, action='store_true',
help='run operations with su (deprecated, use become)')
runas_group.add_option('-R', '--su-user', default=None,
help='run operations with su as this user (default=%s) (deprecated, use become)' % C.DEFAULT_SU_USER)
# consolidated privilege escalation (become)
runas_group.add_option("-b", "--become", default=C.DEFAULT_BECOME, action="store_true", dest='become',
help="run operations with become (does not imply password prompting)")
runas_group.add_option('--become-method', dest='become_method', default=C.DEFAULT_BECOME_METHOD, type='choice', choices=C.BECOME_METHODS,
help="privilege escalation method to use (default=%s), valid choices: [ %s ]" %
(C.DEFAULT_BECOME_METHOD, ' | '.join(C.BECOME_METHODS)))
runas_group.add_option('--become-user', default=None, dest='become_user', type='string',
help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
if runas_opts or runas_prompt_opts:
if not runas_group:
runas_group = rg
runas_group.add_option('--ask-sudo-pass', default=C.DEFAULT_ASK_SUDO_PASS, dest='ask_sudo_pass', action='store_true',
help='ask for sudo password (deprecated, use become)')
runas_group.add_option('--ask-su-pass', default=C.DEFAULT_ASK_SU_PASS, dest='ask_su_pass', action='store_true',
help='ask for su password (deprecated, use become)')
runas_group.add_option('-K', '--ask-become-pass', default=False, dest='become_ask_pass', action='store_true',
help='ask for privilege escalation password')
if runas_group:
parser.add_option_group(runas_group)
if async_opts:
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', dest='poll_interval',
help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL)
parser.add_option('-B', '--background', dest='seconds', type='int', default=0,
help='run asynchronously, failing after X seconds (default=N/A)')
if check_opts:
parser.add_option("-C", "--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
parser.add_option('--syntax-check', dest='syntax', action='store_true',
help="perform a syntax check on the playbook, but do not execute it")
parser.add_option("-D", "--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
if meta_opts:
parser.add_option('--force-handlers', default=C.DEFAULT_FORCE_HANDLERS, dest='force_handlers', action='store_true',
help="run handlers even if a task fails")
parser.add_option('--flush-cache', dest='flush_cache', action='store_true',
help="clear the fact cache for every host in inventory")
if basedir_opts:
parser.add_option('--playbook-dir', default=None, dest='basedir', action='store',
help="Since this tool does not use playbooks, use this as a subsitute playbook directory."
"This sets the relative path for many features including roles/ group_vars/ etc.")
return parser
@abstractmethod
def parse(self):
"""Parse the command line args
def init_parser(self, usage="", output_opts=False, runas_opts=False, meta_opts=False,
runtask_opts=False, vault_opts=False, module_opts=False, async_opts=False,
connect_opts=False, subset_opts=False, check_opts=False, inventory_opts=False,
epilog=None, fork_opts=False, runas_prompt_opts=False, desc=None,
basedir_opts=False, vault_rekey_opts=False):
"""
Create an options parser for most ansible scripts
This method parses the command line arguments. It uses the parser
stored in the self.parser attribute and saves the args and options in
self.args and self.options respectively.
Subclasses need to implement this method. They will usually call the base class's
init_parser to create a basic version and then add their own options on top of that.
Subclasses need to implement this method. They will usually create
a base_parser, add their own options to the base_parser, and then call
this method to do the actual parsing. An implementation will look
something like this::
An implementation will look something like this::
def parse(self):
parser = super(MyCLI, self).base_parser(usage="My Ansible CLI", inventory_opts=True)
parser.add_option('--my-option', dest='my_option', action='store')
self.parser = parser
super(MyCLI, self).parse()
# If some additional transformations are needed for the
# arguments and options, do it here.
def init_parser(self):
self.parser = super(MyCLI, self).init__parser(usage="My Ansible CLI", inventory_opts=True)
self.parser.add_option('--my-option', dest='my_option', action='store')
return self.parser
"""
self.parser = base_parser(usage=usage, output_opts=output_opts, runas_opts=runas_opts,
meta_opts=meta_opts, runtask_opts=runtask_opts,
vault_opts=vault_opts, module_opts=module_opts,
async_opts=async_opts, connect_opts=connect_opts,
subset_opts=subset_opts, check_opts=check_opts,
inventory_opts=inventory_opts, epilog=epilog, fork_opts=fork_opts,
runas_prompt_opts=runas_prompt_opts, desc=desc,
basedir_opts=basedir_opts, vault_rekey_opts=vault_rekey_opts)
return self.parser
self.options, self.args = self.parser.parse_args(self.args[1:])
@abstractmethod
def post_process_args(self, options, args):
"""Process the command line args
Subclasses need to implement this method. This method validates and transforms the command
line arguments. It can be used to check whether conflicting values were given, whether filenames
exist, etc.
An implementation will look something like this::
def post_process_args(self, options, args):
options, args = super(MyCLI, self).post_process_args(options, args)
if options.addition and options.subtraction:
raise AnsibleOptionsError('Only one of --addition and --subtraction can be specified')
if isinstance(options.listofhosts, string_types):
options.listofhosts = string_types.split(',')
return options, args
"""
# process tags
if hasattr(self.options, 'tags') and not self.options.tags:
if hasattr(options, 'tags') and not options.tags:
# optparse defaults does not do what's expected
self.options.tags = ['all']
if hasattr(self.options, 'tags') and self.options.tags:
options.tags = ['all']
if hasattr(options, 'tags') and options.tags:
tags = set()
for tag_set in self.options.tags:
for tag_set in options.tags:
for tag in tag_set.split(u','):
tags.add(tag.strip())
self.options.tags = list(tags)
options.tags = list(tags)
# process skip_tags
if hasattr(self.options, 'skip_tags') and self.options.skip_tags:
if hasattr(options, 'skip_tags') and options.skip_tags:
skip_tags = set()
for tag_set in self.options.skip_tags:
for tag_set in options.skip_tags:
for tag in tag_set.split(u','):
skip_tags.add(tag.strip())
self.options.skip_tags = list(skip_tags)
options.skip_tags = list(skip_tags)
# process inventory options except for CLIs that require their own processing
if hasattr(self.options, 'inventory') and not self.SKIP_INVENTORY_DEFAULTS:
if hasattr(options, 'inventory') and not self.SKIP_INVENTORY_DEFAULTS:
if self.options.inventory:
if options.inventory:
# should always be list
if isinstance(self.options.inventory, string_types):
self.options.inventory = [self.options.inventory]
if isinstance(options.inventory, string_types):
options.inventory = [options.inventory]
# Ensure full paths when needed
self.options.inventory = [unfrackpath(opt, follow=False) if ',' not in opt else opt for opt in self.options.inventory]
options.inventory = [unfrackpath(opt, follow=False) if ',' not in opt else opt for opt in options.inventory]
else:
self.options.inventory = C.DEFAULT_HOST_LIST
options.inventory = C.DEFAULT_HOST_LIST
return options, args
def parse(self):
"""Parse the command line args
This method parses the command line arguments. It uses the parser
stored in the self.parser attribute and saves the args and options in
context.CLIARGS.
Subclasses need to implement two helper methods, init_parser() and post_process_args() which
are called from this function before and after parsing the arguments.
"""
self.init_parser()
options, args = self.parser.parse_args(self.args[1:])
options, args = self.post_process_args(options, args)
options.args = args
context._init_global_context(options)
@staticmethod
def version(prog):
@ -763,42 +805,45 @@ class CLI(with_metaclass(ABCMeta, object)):
return t
@staticmethod
def _play_prereqs(options):
def _play_prereqs():
options = context.CLIARGS
# all needs loader
loader = DataLoader()
basedir = getattr(options, 'basedir', False)
basedir = options.get('basedir', False)
if basedir:
loader.set_basedir(basedir)
vault_ids = options.vault_ids
vault_ids = list(options['vault_ids'])
default_vault_ids = C.DEFAULT_VAULT_IDENTITY_LIST
vault_ids = default_vault_ids + vault_ids
vault_secrets = CLI.setup_vault_secrets(loader,
vault_ids=vault_ids,
vault_password_files=options.vault_password_files,
ask_vault_pass=options.ask_vault_pass,
vault_password_files=list(options['vault_password_files']),
ask_vault_pass=options['ask_vault_pass'],
auto_prompt=False)
loader.set_vault_secrets(vault_secrets)
# create the inventory, and filter it based on the subset specified (if any)
inventory = InventoryManager(loader=loader, sources=options.inventory)
inventory = InventoryManager(loader=loader, sources=options['inventory'])
# create the variable manager, which will be shared throughout
# the code, ensuring a consistent view of global variables
variable_manager = VariableManager(loader=loader, inventory=inventory)
if hasattr(options, 'basedir'):
if options.basedir:
# If the basedir is specified as the empty string then it results in cwd being used. This
# is not a safe location to load vars from
if options.get('basedir', False) is not False:
if basedir:
variable_manager.safe_basedir = True
else:
variable_manager.safe_basedir = True
# load vars from cli options
variable_manager.extra_vars = load_extra_vars(loader=loader, options=options)
variable_manager.options_vars = load_options_vars(options, CLI.version_info(gitinfo=False))
variable_manager.extra_vars = load_extra_vars(loader=loader)
variable_manager.options_vars = load_options_vars(CLI.version_info(gitinfo=False))
return loader, inventory, variable_manager

@ -1,24 +1,12 @@
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright: (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
# Copyright: (c) 2018, Toshio Kuratomi <tkuratomi@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.executor.task_queue_manager import TaskQueueManager
@ -37,10 +25,9 @@ class AdHocCLI(CLI):
this command allows you to define and run a single task 'playbook' against a set of hosts
'''
def parse(self):
def init_parser(self):
''' create an options parser for bin/ansible '''
self.parser = CLI.base_parser(
self.parser = super(AdHocCLI, self).init_parser(
usage='%prog <host-pattern> [options]',
runas_opts=True,
inventory_opts=True,
@ -63,24 +50,32 @@ class AdHocCLI(CLI):
self.parser.add_option('-m', '--module-name', dest='module_name',
help="module name to execute (default=%s)" % C.DEFAULT_MODULE_NAME,
default=C.DEFAULT_MODULE_NAME)
return self.parser
def post_process_args(self, options, args):
'''Post process and validate options for bin/ansible '''
super(AdHocCLI, self).parse()
options, args = super(AdHocCLI, self).post_process_args(options, args)
if len(self.args) < 1:
if len(args) < 1:
raise AnsibleOptionsError("Missing target hosts")
elif len(self.args) > 1:
elif len(args) > 1:
raise AnsibleOptionsError("Extraneous options or arguments")
display.verbosity = self.options.verbosity
self.validate_conflicts(runas_opts=True, vault_opts=True, fork_opts=True)
display.verbosity = options.verbosity
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
options = self.normalize_become_options(options)
return options, args
def _play_ds(self, pattern, async_val, poll):
check_raw = self.options.module_name in ('command', 'win_command', 'shell', 'win_shell', 'script', 'raw')
check_raw = context.CLIARGS['module_name'] in ('command', 'win_command', 'shell', 'win_shell', 'script', 'raw')
mytask = {'action': {'module': self.options.module_name, 'args': parse_kv(self.options.module_args, check_raw=check_raw)}}
mytask = {'action': {'module': context.CLIARGS['module_name'], 'args': parse_kv(context.CLIARGS['module_args'], check_raw=check_raw)}}
# avoid adding to tasks that don't support it, unless set, then give user an error
if self.options.module_name not in ('include_role', 'include_tasks') or any(frozenset((async_val, poll))):
if context.CLIARGS['module_name'] not in ('include_role', 'include_tasks') or any(frozenset((async_val, poll))):
mytask['async_val'] = async_val
mytask['poll'] = poll
@ -96,46 +91,46 @@ class AdHocCLI(CLI):
super(AdHocCLI, self).run()
# only thing left should be host pattern
pattern = to_text(self.args[0], errors='surrogate_or_strict')
pattern = to_text(context.CLIARGS['args'][0], errors='surrogate_or_strict')
sshpass = None
becomepass = None
self.normalize_become_options()
(sshpass, becomepass) = self.ask_passwords()
passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
# dynamically load any plugins
get_all_plugin_loaders()
loader, inventory, variable_manager = self._play_prereqs(self.options)
loader, inventory, variable_manager = self._play_prereqs()
try:
hosts = CLI.get_host_list(inventory, self.options.subset, pattern)
hosts = self.get_host_list(inventory, context.CLIARGS['subset'], pattern)
except AnsibleError:
if self.options.subset:
if context.CLIARGS['subset']:
raise
else:
hosts = []
display.warning("No hosts matched, nothing to do")
if self.options.listhosts:
if context.CLIARGS['listhosts']:
display.display(' hosts (%d):' % len(hosts))
for host in hosts:
display.display(' %s' % host)
return 0
if self.options.module_name in C.MODULE_REQUIRE_ARGS and not self.options.module_args:
err = "No argument passed to %s module" % self.options.module_name
if context.CLIARGS['module_name'] in C.MODULE_REQUIRE_ARGS and not context.CLIARGS['module_args']:
err = "No argument passed to %s module" % context.CLIARGS['module_name']
if pattern.endswith(".yml"):
err = err + ' (did you mean to run ansible-playbook?)'
raise AnsibleOptionsError(err)
# Avoid modules that don't work with ad-hoc
if self.options.module_name in ('import_playbook',):
raise AnsibleOptionsError("'%s' is not a valid action for ad-hoc commands" % self.options.module_name)
if context.CLIARGS['module_name'] in ('import_playbook',):
raise AnsibleOptionsError("'%s' is not a valid action for ad-hoc commands"
% context.CLIARGS['module_name'])
play_ds = self._play_ds(pattern, self.options.seconds, self.options.poll_interval)
play_ds = self._play_ds(pattern, context.CLIARGS['seconds'], context.CLIARGS['poll_interval'])
play = Play().load(play_ds, variable_manager=variable_manager, loader=loader)
# used in start callback
@ -145,7 +140,7 @@ class AdHocCLI(CLI):
if self.callback:
cb = self.callback
elif self.options.one_line:
elif context.CLIARGS['one_line']:
cb = 'oneline'
# Respect custom 'stdout_callback' only with enabled 'bin_ansible_callbacks'
elif C.DEFAULT_LOAD_CALLBACK_PLUGINS and C.DEFAULT_STDOUT_CALLBACK != 'default':
@ -154,9 +149,9 @@ class AdHocCLI(CLI):
cb = 'minimal'
run_tree = False
if self.options.tree:
if context.CLIARGS['tree']:
C.DEFAULT_CALLBACK_WHITELIST.append('tree')
C.TREE_DIR = self.options.tree
C.TREE_DIR = context.CLIARGS['tree']
run_tree = True
# now create a task queue manager to execute the play
@ -166,11 +161,11 @@ class AdHocCLI(CLI):
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=self.options,
passwords=passwords,
stdout_callback=cb,
run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
run_tree=run_tree,
forks=context.CLIARGS['forks'],
)
self._tqm.send_callback('v2_playbook_on_start', playbook)

@ -1,4 +1,4 @@
# Copyright: (c) 2017, Ansible Project
# Copyright: (c) 2017-2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
@ -10,6 +10,7 @@ import subprocess
import sys
import yaml
from ansible import context
from ansible.cli import CLI
from ansible.config.manager import ConfigManager, Setting, find_ini_config_file
from ansible.errors import AnsibleError, AnsibleOptionsError
@ -33,9 +34,9 @@ class ConfigCLI(CLI):
self.config = None
super(ConfigCLI, self).__init__(args, callback)
def parse(self):
def init_parser(self):
self.parser = CLI.base_parser(
self.parser = super(ConfigCLI, self).init_parser(
usage="usage: %%prog [%s] [--help] [options] [ansible.cfg]" % "|".join(sorted(self.VALID_ACTIONS)),
epilog="\nSee '%s <command> --help' for more information on a specific command.\n\n" % os.path.basename(sys.argv[0]),
desc="View, edit, and manage ansible configuration.",
@ -56,15 +57,20 @@ class ConfigCLI(CLI):
elif self.action == "search":
self.parser.set_usage("usage: %prog update [options] [-c ansible.cfg] <search term>")
self.options, self.args = self.parser.parse_args()
display.verbosity = self.options.verbosity
return self.parser
def post_process_args(self, options, args):
super(ConfigCLI, self).post_process_args(options, args)
display.verbosity = options.verbosity
return options, args
def run(self):
super(ConfigCLI, self).run()
if self.options.config_file:
self.config_file = unfrackpath(self.options.config_file, follow=False)
if context.CLIARGS['config_file']:
self.config_file = unfrackpath(context.CLIARGS['config_file'], follow=False)
self.config = ConfigManager(self.config_file)
else:
self.config = ConfigManager()
@ -96,10 +102,10 @@ class ConfigCLI(CLI):
raise AnsibleError("Option not implemented yet")
# pylint: disable=unreachable
if self.options.setting is None:
if context.CLIARGS['setting'] is None:
raise AnsibleOptionsError("update option requires a setting to update")
(entry, value) = self.options.setting.split('=')
(entry, value) = context.CLIARGS['setting'].split('=')
if '.' in entry:
(section, option) = entry.split('.')
else:
@ -164,7 +170,7 @@ class ConfigCLI(CLI):
else:
color = 'green'
msg = "%s(%s) = %s" % (setting, 'default', defaults[setting].get('default'))
if not self.options.only_changed or color == 'yellow':
if not context.CLIARGS['only_changed'] or color == 'yellow':
text.append(stringc(msg, color))
self.pager(to_text('\n'.join(text), errors='surrogate_or_strict'))

@ -1,19 +1,7 @@
# (c) 2014, Nandor Sivok <dominis@haxor.hu>
# (c) 2016, Redhat Inc
#
# ansible-console is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ansible-console is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright: (c) 2014, Nandor Sivok <dominis@haxor.hu>
# Copyright: (c) 2016, Redhat Inc
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@ -37,6 +25,7 @@ import os
import sys
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_native, to_text
@ -75,10 +64,21 @@ class ConsoleCLI(CLI, cmd.Cmd):
self.passwords = dict()
self.modules = None
self.cwd = '*'
# Defaults for these are set from the CLI in run()
self.remote_user = None
self.become = None
self.become_user = None
self.become_method = None
self.check_mode = None
self.diff = None
self.forks = None
cmd.Cmd.__init__(self)
def parse(self):
self.parser = CLI.base_parser(
def init_parser(self):
super(ConsoleCLI, self).init_parser(
usage='%prog [<host-pattern>] [options]',
runas_opts=True,
inventory_opts=True,
@ -96,12 +96,14 @@ class ConsoleCLI(CLI, cmd.Cmd):
self.parser.add_option('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
self.parser.set_defaults(cwd='*')
return self.parser
super(ConsoleCLI, self).parse()
display.verbosity = self.options.verbosity
self.validate_conflicts(runas_opts=True, vault_opts=True, fork_opts=True)
def post_process_args(self, options, args):
options, args = super(ConsoleCLI, self).post_process_args(options, args)
display.verbosity = options.verbosity
options = self.normalize_become_options(options)
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
return options, args
def get_names(self):
return dir(self)
@ -113,10 +115,10 @@ class ConsoleCLI(CLI, cmd.Cmd):
self.do_exit(self)
def set_prompt(self):
login_user = self.options.remote_user or getpass.getuser()
self.selected = self.inventory.list_hosts(self.options.cwd)
prompt = "%s@%s (%d)[f:%s]" % (login_user, self.options.cwd, len(self.selected), self.options.forks)
if self.options.become and self.options.become_user in [None, 'root']:
login_user = self.remote_user or getpass.getuser()
self.selected = self.inventory.list_hosts(self.cwd)
prompt = "%s@%s (%d)[f:%s]" % (login_user, self.cwd, len(self.selected), self.forks)
if self.become and self.become_user in [None, 'root']:
prompt += "# "
color = C.COLOR_ERROR
else:
@ -126,8 +128,8 @@ class ConsoleCLI(CLI, cmd.Cmd):
def list_modules(self):
modules = set()
if self.options.module_path:
for path in self.options.module_path:
if context.CLIARGS['module_path']:
for path in context.CLIARGS['module_path']:
if path:
module_loader.add_directory(path)
@ -165,7 +167,7 @@ class ConsoleCLI(CLI, cmd.Cmd):
if arg.startswith("#"):
return False
if not self.options.cwd:
if not self.cwd:
display.error("No host found")
return False
@ -180,16 +182,20 @@ class ConsoleCLI(CLI, cmd.Cmd):
module = 'shell'
module_args = arg
self.options.module_name = module
result = None
try:
check_raw = self.options.module_name in ('command', 'shell', 'script', 'raw')
check_raw = module in ('command', 'shell', 'script', 'raw')
play_ds = dict(
name="Ansible Shell",
hosts=self.options.cwd,
hosts=self.cwd,
gather_facts='no',
tasks=[dict(action=dict(module=module, args=parse_kv(module_args, check_raw=check_raw)))]
tasks=[dict(action=dict(module=module, args=parse_kv(module_args, check_raw=check_raw)))],
remote_user=self.remote_user,
become=self.become,
become_user=self.become_user,
become_method=self.become_method,
check_mode=self.check_mode,
diff=self.diff,
)
play = Play().load(play_ds, variable_manager=self.variable_manager, loader=self.loader)
except Exception as e:
@ -205,11 +211,11 @@ class ConsoleCLI(CLI, cmd.Cmd):
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords=self.passwords,
stdout_callback=cb,
run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
run_tree=False,
forks=self.forks,
)
result = self._tqm.run(play)
@ -252,7 +258,13 @@ class ConsoleCLI(CLI, cmd.Cmd):
if not arg:
display.display('Usage: forks <number>')
return
self.options.forks = int(arg)
forks = int(arg)
if forks <= 0:
display.display('forks must be greater than or equal to 1')
return
self.forks = forks
self.set_prompt()
do_serial = do_forks
@ -275,11 +287,11 @@ class ConsoleCLI(CLI, cmd.Cmd):
cd webservers:dbservers:&staging:!phoenix
"""
if not arg:
self.options.cwd = '*'
self.cwd = '*'
elif arg in '/*':
self.options.cwd = 'all'
self.cwd = 'all'
elif self.inventory.get_hosts(arg):
self.options.cwd = arg
self.cwd = arg
else:
display.display("no host matched")
@ -297,8 +309,8 @@ class ConsoleCLI(CLI, cmd.Cmd):
def do_become(self, arg):
"""Toggle whether plays run with become"""
if arg:
self.options.become = boolean(arg, strict=False)
display.v("become changed to %s" % self.options.become)
self.become = boolean(arg, strict=False)
display.v("become changed to %s" % self.become)
self.set_prompt()
else:
display.display("Please specify become value, e.g. `become yes`")
@ -306,7 +318,7 @@ class ConsoleCLI(CLI, cmd.Cmd):
def do_remote_user(self, arg):
"""Given a username, set the remote user plays are run by"""
if arg:
self.options.remote_user = arg
self.remote_user = arg
self.set_prompt()
else:
display.display("Please specify a remote user, e.g. `remote_user root`")
@ -314,33 +326,33 @@ class ConsoleCLI(CLI, cmd.Cmd):
def do_become_user(self, arg):
"""Given a username, set the user that plays are run by when using become"""
if arg:
self.options.become_user = arg
self.become_user = arg
else:
display.display("Please specify a user, e.g. `become_user jenkins`")
display.v("Current user is %s" % self.options.become_user)
display.v("Current user is %s" % self.become_user)
self.set_prompt()
def do_become_method(self, arg):
"""Given a become_method, set the privilege escalation method when using become"""
if arg:
self.options.become_method = arg
display.v("become_method changed to %s" % self.options.become_method)
self.become_method = arg
display.v("become_method changed to %s" % self.become_method)
else:
display.display("Please specify a become_method, e.g. `become_method su`")
def do_check(self, arg):
"""Toggle whether plays run with check mode"""
if arg:
self.options.check = boolean(arg, strict=False)
display.v("check mode changed to %s" % self.options.check)
self.check_mode = boolean(arg, strict=False)
display.v("check mode changed to %s" % self.check_mode)
else:
display.display("Please specify check mode value, e.g. `check yes`")
def do_diff(self, arg):
"""Toggle whether plays run with diff"""
if arg:
self.options.diff = boolean(arg, strict=False)
display.v("diff mode changed to %s" % self.options.diff)
self.diff = boolean(arg, strict=False)
display.v("diff mode changed to %s" % self.diff)
else:
display.display("Please specify a diff value , e.g. `diff yes`")
@ -370,10 +382,10 @@ class ConsoleCLI(CLI, cmd.Cmd):
mline = line.partition(' ')[2]
offs = len(mline) - len(text)
if self.options.cwd in ('all', '*', '\\'):
if self.cwd in ('all', '*', '\\'):
completions = self.hosts + self.groups
else:
completions = [x.name for x in self.inventory.list_hosts(self.options.cwd)]
completions = [x.name for x in self.inventory.list_hosts(self.cwd)]
return [to_native(s)[offs:] for s in completions if to_native(s).startswith(to_native(mline))]
@ -398,11 +410,20 @@ class ConsoleCLI(CLI, cmd.Cmd):
becomepass = None
# hosts
if len(self.args) != 1:
if len(context.CLIARGS['args']) != 1:
self.pattern = 'all'
else:
self.pattern = self.args[0]
self.options.cwd = self.pattern
self.pattern = context.CLIARGS['args'][0]
self.cwd = self.pattern
# Defaults from the command line
self.remote_user = context.CLIARGS['remote_user']
self.become = context.CLIARGS['become']
self.become_user = context.CLIARGS['become_user']
self.become_method = context.CLIARGS['become_method']
self.check_mode = context.CLIARGS['check']
self.diff = context.CLIARGS['diff']
self.forks = context.CLIARGS['forks']
# dynamically add modules as commands
self.modules = self.list_modules()
@ -410,13 +431,12 @@ class ConsoleCLI(CLI, cmd.Cmd):
setattr(self, 'do_' + module, lambda arg, module=module: self.default(module + ' ' + arg))
setattr(self, 'help_' + module, lambda module=module: self.helpdefault(module))
self.normalize_become_options()
(sshpass, becomepass) = self.ask_passwords()
self.passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
self.loader, self.inventory, self.variable_manager = self._play_prereqs(self.options)
self.loader, self.inventory, self.variable_manager = self._play_prereqs()
hosts = CLI.get_host_list(self.inventory, self.options.subset, self.pattern)
hosts = self.get_host_list(self.inventory, context.CLIARGS['subset'], self.pattern)
self.groups = self.inventory.list_groups()
self.hosts = [x.name for x in hosts]

@ -1,4 +1,5 @@
# Copyright: (c) 2014, James Tanner <tanner.jc@gmail.com>
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
@ -14,6 +15,7 @@ import yaml
import ansible.plugins.loader as plugin_loader
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils._text import to_native
@ -43,9 +45,9 @@ class DocCLI(CLI):
super(DocCLI, self).__init__(args)
self.plugin_list = set()
def parse(self):
def init_parser(self):
self.parser = CLI.base_parser(
self.parser = super(DocCLI, self).init_parser(
usage='usage: %prog [-l|-F|-s] [options] [-t <plugin type> ] [plugin]',
module_opts=True,
desc="plugin documentation tool",
@ -66,18 +68,27 @@ class DocCLI(CLI):
help='Choose which plugin type (defaults to "module"). '
'Available plugin types are : {0}'.format(C.DOCUMENTABLE_PLUGINS),
choices=C.DOCUMENTABLE_PLUGINS)
super(DocCLI, self).parse()
return self.parser
if [self.options.all_plugins, self.options.json_dump, self.options.list_dir, self.options.list_files, self.options.show_snippet].count(True) > 1:
def post_process_args(self, options, args):
if [options.all_plugins, options.json_dump, options.list_dir, options.list_files, options.show_snippet].count(True) > 1:
raise AnsibleOptionsError("Only one of -l, -F, -s, -j or -a can be used at the same time.")
display.verbosity = self.options.verbosity
display.verbosity = options.verbosity
# process all plugins of type
if options.all_plugins:
args = self.get_all_plugins_of_type(options['type'])
if options.module_path:
display.warning('Ignoring "--module-path/-M" option as "--all/-a" only displays builtins')
return options, args
def run(self):
super(DocCLI, self).run()
plugin_type = self.options.type
plugin_type = context.CLIARGS['type']
if plugin_type in C.DOCUMENTABLE_PLUGINS:
loader = getattr(plugin_loader, '%s_loader' % plugin_type)
@ -85,17 +96,17 @@ class DocCLI(CLI):
raise AnsibleOptionsError("Unknown or undocumentable plugin type: %s" % plugin_type)
# add to plugin path from command line
if self.options.module_path:
for path in self.options.module_path:
if context.CLIARGS['module_path']:
for path in context.CLIARGS['module_path']:
if path:
loader.add_directory(path)
# save only top level paths for errors
search_paths = DocCLI.print_paths(loader)
search_paths = self.print_paths(loader)
loader._paths = None # reset so we can use subdirs below
# list plugins names and filepath for type
if self.options.list_files:
if context.CLIARGS['list_files']:
paths = loader._get_paths()
for path in paths:
self.plugin_list.update(self.find_plugins(path, plugin_type))
@ -105,7 +116,7 @@ class DocCLI(CLI):
return 0
# list plugins for type
if self.options.list_dir:
if context.CLIARGS['list_dir']:
paths = loader._get_paths()
for path in paths:
self.plugin_list.update(self.find_plugins(path, plugin_type))
@ -113,14 +124,8 @@ class DocCLI(CLI):
self.pager(self.get_plugin_list_text(loader))
return 0
# process all plugins of type
if self.options.all_plugins:
self.args = self.get_all_plugins_of_type(plugin_type)
if self.options.module_path:
display.warning('Ignoring "--module-path/-M" option as "--all/-a" only displays builtins')
# dump plugin desc/metadata as JSON
if self.options.json_dump:
if context.CLIARGS['json_dump']:
plugin_data = {}
plugin_names = self.get_all_plugins_of_type(plugin_type)
for plugin_name in plugin_names:
@ -132,12 +137,12 @@ class DocCLI(CLI):
return 0
if len(self.args) == 0:
if len(context.CLIARGS['args']) == 0:
raise AnsibleOptionsError("Incorrect options passed")
# process command line list
text = ''
for plugin in self.args:
for plugin in context.CLIARGS['args']:
textret = self.format_plugin_doc(plugin, loader, plugin_type, search_paths)
if textret:
@ -165,7 +170,7 @@ class DocCLI(CLI):
raise AnsibleError("unable to load {0} plugin named {1} ".format(plugin_type, plugin_name))
try:
doc, __, __, metadata = get_docstring(filename, fragment_loader, verbose=(self.options.verbosity > 0))
doc, __, __, metadata = get_docstring(filename, fragment_loader, verbose=(context.CLIARGS['verbosity'] > 0))
except Exception:
display.vvv(traceback.format_exc())
raise AnsibleError(
@ -215,7 +220,7 @@ class DocCLI(CLI):
try:
doc, plainexamples, returndocs, metadata = get_docstring(filename, fragment_loader,
verbose=(self.options.verbosity > 0))
verbose=(context.CLIARGS['verbosity'] > 0))
except Exception:
display.vvv(traceback.format_exc())
display.error(
@ -242,7 +247,7 @@ class DocCLI(CLI):
if 'docuri' in doc:
doc['docuri'] = doc[plugin_type].replace('_', '-')
if self.options.show_snippet and plugin_type == 'module':
if context.CLIARGS['show_snippet'] and plugin_type == 'module':
text += self.get_snippet_text(doc)
else:
text += self.get_man_text(doc)
@ -516,13 +521,13 @@ class DocCLI(CLI):
def get_man_text(self, doc):
self.IGNORE = self.IGNORE + (self.options.type,)
self.IGNORE = self.IGNORE + (context.CLIARGS['type'],)
opt_indent = " "
text = []
pad = display.columns * 0.20
limit = max(display.columns - int(pad), 70)
text.append("> %s (%s)\n" % (doc.get(self.options.type, doc.get('plugin_type')).upper(), doc.pop('filename')))
text.append("> %s (%s)\n" % (doc.get(context.CLIARGS['type'], doc.get('plugin_type')).upper(), doc.pop('filename')))
if isinstance(doc['description'], list):
desc = " ".join(doc.pop('description'))

@ -1,23 +1,6 @@
########################################################################
#
# (C) 2013, James Cammarata <jcammarata@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
########################################################################
# Copyright: (c) 2013, James Cammarata <jcammarata@ansible.com>
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@ -31,6 +14,7 @@ import yaml
from jinja2 import Environment, FileSystemLoader
from ansible import context
import ansible.constants as C
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
@ -131,10 +115,10 @@ class GalaxyCLI(CLI):
if self.action in ("init", "install"):
self.parser.add_option('-f', '--force', dest='force', action='store_true', default=False, help='Force overwriting an existing role')
def parse(self):
def init_parser(self):
''' create an options parser for bin/ansible '''
self.parser = CLI.base_parser(
self.parser = super(GalaxyCLI, self).init_parser(
usage="usage: %%prog [%s] [--help] [options] ..." % "|".join(sorted(self.VALID_ACTIONS)),
epilog="\nSee '%s <command> --help' for more information on a specific command.\n\n" % os.path.basename(sys.argv[0]),
desc="Perform various Role related operations.",
@ -146,15 +130,19 @@ class GalaxyCLI(CLI):
help='Ignore SSL certificate validation errors.')
self.set_action()
super(GalaxyCLI, self).parse()
return self.parser
display.verbosity = self.options.verbosity
self.galaxy = Galaxy(self.options)
def post_process_args(self, options, args):
options, args = super(GalaxyCLI, self).post_process_args(options, args)
display.verbosity = options.verbosity
return options, args
def run(self):
super(GalaxyCLI, self).run()
self.galaxy = Galaxy()
self.api = GalaxyAPI(self.galaxy)
self.execute()
@ -163,7 +151,7 @@ class GalaxyCLI(CLI):
Exits with the specified return code unless the
option --ignore-errors was specified
"""
if not self.options.ignore_errors:
if not context.CLIARGS['ignore_errors']:
raise AnsibleError('- you can use --ignore-errors to skip failed roles and finish processing the list.')
def _display_role_info(self, role_info):
@ -196,11 +184,11 @@ class GalaxyCLI(CLI):
creates the skeleton framework of a role that complies with the galaxy metadata format.
"""
init_path = self.options.init_path
force = self.options.force
role_skeleton = self.options.role_skeleton
init_path = context.CLIARGS['init_path']
force = context.CLIARGS['force']
role_skeleton = context.CLIARGS['role_skeleton']
role_name = self.args.pop(0).strip() if self.args else None
role_name = context.CLIARGS['args'][0].strip() if context.CLIARGS['args'] else None
if not role_name:
raise AnsibleOptionsError("- no role name specified for init")
role_path = os.path.join(init_path, role_name)
@ -221,7 +209,7 @@ class GalaxyCLI(CLI):
license='license (GPLv2, CC-BY, etc)',
issue_tracker_url='http://example.com/issue/tracker',
min_ansible_version='2.4',
role_type=self.options.role_type
role_type=context.CLIARGS['role_type']
)
# create role directory
@ -268,14 +256,14 @@ class GalaxyCLI(CLI):
prints out detailed information about an installed role as well as info available from the galaxy API.
"""
if len(self.args) == 0:
if not context.CLIARGS['args']:
# the user needs to specify a role
raise AnsibleOptionsError("- you must specify a user/role name")
roles_path = self.options.roles_path
roles_path = context.CLIARGS['roles_path']
data = ''
for role in self.args:
for role in context.CLIARGS['args']:
role_info = {'path': roles_path}
gr = GalaxyRole(self.galaxy, role)
@ -288,7 +276,7 @@ class GalaxyCLI(CLI):
role_info.update(install_info)
remote_data = False
if not self.options.offline:
if not context.CLIARGS['offline']:
remote_data = self.api.lookup_role_by_name(role, False)
if remote_data:
@ -315,14 +303,14 @@ class GalaxyCLI(CLI):
uses the args list of roles to be installed, unless -f was specified. The list of roles
can be a name (which will be downloaded via the galaxy API and github), or it can be a local .tar.gz file.
"""
role_file = self.options.role_file
role_file = context.CLIARGS['role_file']
if len(self.args) == 0 and role_file is None:
if not context.CLIARGS['args'] and role_file is None:
# the user needs to specify one of either --role-file or specify a single user/role name
raise AnsibleOptionsError("- you must specify a user/role name or a roles file")
no_deps = self.options.no_deps
force = self.options.force
no_deps = context.CLIARGS['no_deps']
force = context.CLIARGS['force']
roles_left = []
if role_file:
@ -362,13 +350,13 @@ class GalaxyCLI(CLI):
else:
# roles were specified directly, so we'll just go out grab them
# (and their dependencies, unless the user doesn't want us to).
for rname in self.args:
for rname in context.CLIARGS['args']:
role = RoleRequirement.role_yaml_parse(rname.strip())
roles_left.append(GalaxyRole(self.galaxy, **role))
for role in roles_left:
# only process roles in roles files when names matches if given
if role_file and self.args and role.name not in self.args:
if role_file and context.CLIARGS['args'] and role.name not in context.CLIARGS['args']:
display.vvv('Skipping role %s' % role.name)
continue
@ -437,10 +425,10 @@ class GalaxyCLI(CLI):
removes the list of roles passed as arguments from the local system.
"""
if len(self.args) == 0:
if not context.CLIARGS['args']:
raise AnsibleOptionsError('- you must specify at least one role to remove.')
for role_name in self.args:
for role_name in context.CLIARGS['args']:
role = GalaxyRole(self.galaxy, role_name)
try:
if role.remove():
@ -457,7 +445,7 @@ class GalaxyCLI(CLI):
lists the roles installed on the local system or matches a single role passed as an argument.
"""
if len(self.args) > 1:
if len(context.CLIARGS['args']) > 1:
raise AnsibleOptionsError("- please specify only one role to list, or specify no roles to see a full list")
def _display_role(gr):
@ -469,9 +457,9 @@ class GalaxyCLI(CLI):
version = "(unknown version)"
display.display("- %s, %s" % (gr.name, version))
if len(self.args) == 1:
if context.CLIARGS['args']:
# show the requested role, if it exists
name = self.args.pop()
name = context.CLIARGS['args'][0]
gr = GalaxyRole(self.galaxy, name)
if gr.metadata:
display.display('# %s' % os.path.dirname(gr.path))
@ -480,7 +468,7 @@ class GalaxyCLI(CLI):
display.display("- the role %s was not found" % name)
else:
# show all valid roles in the roles_path directory
roles_path = self.options.roles_path
roles_path = context.CLIARGS['roles_path']
path_found = False
warnings = []
for path in roles_path:
@ -509,17 +497,14 @@ class GalaxyCLI(CLI):
page_size = 1000
search = None
if len(self.args):
terms = []
for i in range(len(self.args)):
terms.append(self.args.pop())
search = '+'.join(terms[::-1])
if context.CLIARGS['args']:
search = '+'.join(context.CLIARGS['args'])
if not search and not self.options.platforms and not self.options.galaxy_tags and not self.options.author:
if not search and not context.CLIARGS['platforms'] and not context.CLIARGS['galaxy_tags'] and not context.CLIARGS['author']:
raise AnsibleError("Invalid query. At least one search term, platform, galaxy tag or author must be provided.")
response = self.api.search_roles(search, platforms=self.options.platforms,
tags=self.options.galaxy_tags, author=self.options.author, page_size=page_size)
response = self.api.search_roles(search, platforms=context.CLIARGS['platforms'],
tags=context.CLIARGS['galaxy_tags'], author=context.CLIARGS['author'], page_size=page_size)
if response['count'] == 0:
display.display("No roles match your search.", color=C.COLOR_ERROR)
@ -553,18 +538,18 @@ class GalaxyCLI(CLI):
verify user's identify via Github and retrieve an auth token from Ansible Galaxy.
"""
# Authenticate with github and retrieve a token
if self.options.token is None:
if context.CLIARGS['token'] is None:
if C.GALAXY_TOKEN:
github_token = C.GALAXY_TOKEN
else:
login = GalaxyLogin(self.galaxy)
github_token = login.create_github_token()
else:
github_token = self.options.token
github_token = context.CLIARGS['token']
galaxy_response = self.api.authenticate(github_token)
if self.options.token is None and C.GALAXY_TOKEN is None:
if context.CLIARGS['token'] is None and C.GALAXY_TOKEN is None:
# Remove the token we created
login.remove_github_token()
@ -586,17 +571,19 @@ class GalaxyCLI(CLI):
'FAILED': C.COLOR_ERROR,
}
if len(self.args) < 2:
if len(context.CLIARGS['args']) < 2:
raise AnsibleError("Expected a github_username and github_repository. Use --help.")
github_repo = to_text(self.args.pop(), errors='surrogate_or_strict')
github_user = to_text(self.args.pop(), errors='surrogate_or_strict')
github_user = to_text(context.CLIARGS['args'][0], errors='surrogate_or_strict')
github_repo = to_text(context.CLIARGS['args'][1], errors='surrogate_or_strict')
if self.options.check_status:
if context.CLIARGS['check_status']:
task = self.api.get_import_task(github_user=github_user, github_repo=github_repo)
else:
# Submit an import request
task = self.api.create_import_task(github_user, github_repo, reference=self.options.reference, role_name=self.options.role_name)
task = self.api.create_import_task(github_user, github_repo,
reference=context.CLIARGS['reference'],
role_name=context.CLIARGS['role_name'])
if len(task) > 1:
# found multiple roles associated with github_user/github_repo
@ -610,11 +597,11 @@ class GalaxyCLI(CLI):
return 0
# found a single role as expected
display.display("Successfully submitted import request %d" % task[0]['id'])
if not self.options.wait:
if not context.CLIARGS['wait']:
display.display("Role name: %s" % task[0]['summary_fields']['role']['name'])
display.display("Repo: %s/%s" % (task[0]['github_user'], task[0]['github_repo']))
if self.options.check_status or self.options.wait:
if context.CLIARGS['check_status'] or context.CLIARGS['wait']:
# Get the status of the import
msg_list = []
finished = False
@ -634,7 +621,7 @@ class GalaxyCLI(CLI):
def execute_setup(self):
""" Setup an integration from Github or Travis for Ansible Galaxy roles"""
if self.options.setup_list:
if context.CLIARGS['setup_list']:
# List existing integration secrets
secrets = self.api.list_secrets()
if len(secrets) == 0:
@ -648,19 +635,19 @@ class GalaxyCLI(CLI):
secret['github_repo']), color=C.COLOR_OK)
return 0
if self.options.remove_id:
if context.CLIARGS['remove_id']:
# Remove a secret
self.api.remove_secret(self.options.remove_id)
self.api.remove_secret(context.CLIARGS['remove_id'])
display.display("Secret removed. Integrations using this secret will not longer work.", color=C.COLOR_OK)
return 0
if len(self.args) < 4:
if len(context.CLIARGS['args']) < 4:
raise AnsibleError("Missing one or more arguments. Expecting: source github_user github_repo secret")
secret = self.args.pop()
github_repo = self.args.pop()
github_user = self.args.pop()
source = self.args.pop()
source = context.CLIARGS['args'][0]
github_user = context.CLIARGS['args'][1]
github_repo = context.CLIARGS['args'][2]
secret = context.CLIARGS['args'][3]
resp = self.api.add_secret(source, github_user, github_repo, secret)
display.display("Added integration for %s %s/%s" % (resp['source'], resp['github_user'], resp['github_repo']))
@ -670,11 +657,11 @@ class GalaxyCLI(CLI):
def execute_delete(self):
""" Delete a role from Ansible Galaxy. """
if len(self.args) < 2:
if len(context.CLIARGS['args']) < 2:
raise AnsibleError("Missing one or more arguments. Expected: github_user github_repo")
github_repo = self.args.pop()
github_user = self.args.pop()
github_user = context.CLIARGS['args'][0]
github_repo = context.CLIARGS['args'][1]
resp = self.api.delete_role(github_user, github_repo)
if len(resp['deleted_roles']) > 1:

@ -1,18 +1,6 @@
# (c) 2017, Brian Coca <bcoca@ansible.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright: (c) 2017, Brian Coca <bcoca@ansible.com>
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@ -21,6 +9,7 @@ import optparse
from operator import attrgetter
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.inventory.host import Host
@ -66,9 +55,9 @@ class InventoryCLI(CLI):
self._new_api = True
def parse(self):
def init_parser(self):
self.parser = CLI.base_parser(
self.parser = super(InventoryCLI, self).init_parser(
usage='usage: %prog [options] [host|group]',
epilog='Show Ansible inventory information, by default it uses the inventory script JSON format',
inventory_opts=True,
@ -103,15 +92,15 @@ class InventoryCLI(CLI):
# self.parser.add_option("--ignore-vars-plugins", action="store_true", default=False, dest='ignore_vars_plugins',
# help="When doing an --list, skip vars data from vars plugins, by default, this would include group_vars/ and host_vars/")
super(InventoryCLI, self).parse()
return self.parser
display.verbosity = self.options.verbosity
self.validate_conflicts(vault_opts=True)
def post_process_args(self, options, args):
display.verbosity = options.verbosity
self.validate_conflicts(options, vault_opts=True)
# there can be only one! and, at least, one!
used = 0
for opt in (self.options.list, self.options.host, self.options.graph):
for opt in (options.list, options.host, options.graph):
if opt:
used += 1
if used == 0:
@ -120,22 +109,23 @@ class InventoryCLI(CLI):
raise AnsibleOptionsError("Conflicting options used, only one of --host, --graph or --list can be used at the same time.")
# set host pattern to default if not supplied
if len(self.args) > 0:
self.options.pattern = self.args[0]
if len(args) > 0:
options.pattern = args[0]
else:
self.options.pattern = 'all'
options.pattern = 'all'
return options, args
def run(self):
super(InventoryCLI, self).run()
results = None
# Initialize needed objects
self.loader, self.inventory, self.vm = self._play_prereqs(self.options)
self.loader, self.inventory, self.vm = self._play_prereqs()
if self.options.host:
hosts = self.inventory.get_hosts(self.options.host)
results = None
if context.CLIARGS['host']:
hosts = self.inventory.get_hosts(context.CLIARGS['host'])
if len(hosts) != 1:
raise AnsibleOptionsError("You must pass a single valid host to --host parameter")
@ -145,13 +135,13 @@ class InventoryCLI(CLI):
# FIXME: should we template first?
results = self.dump(myvars)
elif self.options.graph:
elif context.CLIARGS['graph']:
results = self.inventory_graph()
elif self.options.list:
elif context.CLIARGS['list']:
top = self._get_group('all')
if self.options.yaml:
if context.CLIARGS['yaml']:
results = self.yaml_inventory(top)
elif self.options.toml:
elif context.CLIARGS['toml']:
results = self.toml_inventory(top)
else:
results = self.json_inventory(top)
@ -166,11 +156,11 @@ class InventoryCLI(CLI):
def dump(self, stuff):
if self.options.yaml:
if context.CLIARGS['yaml']:
import yaml
from ansible.parsing.yaml.dumper import AnsibleDumper
results = yaml.dump(stuff, Dumper=AnsibleDumper, default_flow_style=False)
elif self.options.toml:
elif context.CLIARGS['toml']:
from ansible.plugins.inventory.toml import toml_dumps, HAS_TOML
if not HAS_TOML:
raise AnsibleError(
@ -227,7 +217,7 @@ class InventoryCLI(CLI):
def _get_host_variables(self, host):
if self.options.export:
if context.CLIARGS['export']:
hostvars = host.get_vars()
# FIXME: add switch to skip vars plugins
@ -264,7 +254,7 @@ class InventoryCLI(CLI):
def _show_vars(self, dump, depth):
result = []
self._remove_internal(dump)
if self.options.show_vars:
if context.CLIARGS['show_vars']:
for (name, val) in sorted(dump.items()):
result.append(self._graph_name('{%s = %s}' % (name, val), depth))
return result
@ -292,7 +282,7 @@ class InventoryCLI(CLI):
def inventory_graph(self):
start_at = self._get_group(self.options.pattern)
start_at = self._get_group(context.CLIARGS['pattern'])
if start_at:
return '\n'.join(self._graph_group(start_at))
else:
@ -313,7 +303,7 @@ class InventoryCLI(CLI):
if subgroup.name not in seen:
results.update(format_group(subgroup))
seen.add(subgroup.name)
if self.options.export:
if context.CLIARGS['export']:
results[group.name]['vars'] = self._get_group_variables(group)
self._remove_empty(results[group.name])
@ -362,8 +352,7 @@ class InventoryCLI(CLI):
self._remove_internal(myvars)
results[group.name]['hosts'][h.name] = myvars
if self.options.export:
if context.CLIARGS['export']:
gvars = self._get_group_variables(group)
if gvars:
results[group.name]['vars'] = gvars
@ -403,7 +392,7 @@ class InventoryCLI(CLI):
except KeyError:
results[group.name]['hosts'] = {host.name: host_vars}
if self.options.export:
if context.CLIARGS['export']:
results[group.name]['vars'] = self._get_group_variables(group)
self._remove_empty(results[group.name])

@ -21,6 +21,7 @@ __metaclass__ = type
import os
import stat
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.executor.playbook_executor import PlaybookExecutor
@ -35,10 +36,10 @@ class PlaybookCLI(CLI):
''' the tool to run *Ansible playbooks*, which are a configuration and multinode deployment system.
See the project home page (https://docs.ansible.com) for more information. '''
def parse(self):
def init_parser(self):
# create parser for CLI options
parser = CLI.base_parser(
super(PlaybookCLI, self).init_parser(
usage="%prog [options] playbook.yml [playbook2 ...]",
connect_opts=True,
meta_opts=True,
@ -54,49 +55,55 @@ class PlaybookCLI(CLI):
)
# ansible playbook specific opts
parser.add_option('--list-tasks', dest='listtasks', action='store_true',
self.parser.add_option('--list-tasks', dest='listtasks', action='store_true',
help="list all tasks that would be executed")
parser.add_option('--list-tags', dest='listtags', action='store_true',
self.parser.add_option('--list-tags', dest='listtags', action='store_true',
help="list all available tags")
parser.add_option('--step', dest='step', action='store_true',
self.parser.add_option('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
parser.add_option('--start-at-task', dest='start_at_task',
self.parser.add_option('--start-at-task', dest='start_at_task',
help="start the playbook at the task matching this name")
self.parser = parser
super(PlaybookCLI, self).parse()
return self.parser
if len(self.args) == 0:
def post_process_args(self, options, args):
options, args = super(PlaybookCLI, self).post_process_args(options, args)
if len(args) == 0:
raise AnsibleOptionsError("You must specify a playbook file to run")
display.verbosity = self.options.verbosity
self.validate_conflicts(runas_opts=True, vault_opts=True, fork_opts=True)
display.verbosity = options.verbosity
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
options = self.normalize_become_options(options)
return options, args
def run(self):
super(PlaybookCLI, self).run()
# Note: slightly wrong, this is written so that implicit localhost
# Manage passwords
# manages passwords
sshpass = None
becomepass = None
passwords = {}
# initial error check, to make sure all specified playbooks are accessible
# before we start running anything through the playbook executor
for playbook in self.args:
for playbook in context.CLIARGS['args']:
if not os.path.exists(playbook):
raise AnsibleError("the playbook: %s could not be found" % playbook)
if not (os.path.isfile(playbook) or stat.S_ISFIFO(os.stat(playbook).st_mode)):
raise AnsibleError("the playbook: %s does not appear to be a file" % playbook)
# don't deal with privilege escalation or passwords when we don't need to
if not self.options.listhosts and not self.options.listtasks and not self.options.listtags and not self.options.syntax:
self.normalize_become_options()
if not (context.CLIARGS['listhosts'] or context.CLIARGS['listtasks'] or
context.CLIARGS['listtags'] or context.CLIARGS['syntax']):
(sshpass, becomepass) = self.ask_passwords()
passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
loader, inventory, variable_manager = self._play_prereqs(self.options)
loader, inventory, variable_manager = self._play_prereqs()
# (which is not returned in list_hosts()) is taken into account for
# warning if inventory is empty. But it can't be taken into account for
@ -104,14 +111,15 @@ class PlaybookCLI(CLI):
# limit if only implicit localhost was in inventory to start with.
#
# Fix this when we rewrite inventory by making localhost a real host (and thus show up in list_hosts())
hosts = CLI.get_host_list(inventory, self.options.subset)
hosts = super(PlaybookCLI, self).get_host_list(inventory, context.CLIARGS['subset'])
# flush fact cache if requested
if self.options.flush_cache:
if context.CLIARGS['flush_cache']:
self._flush_cache(inventory, variable_manager)
# create the playbook executor, which manages running the plays via a task queue manager
pbex = PlaybookExecutor(playbooks=self.args, inventory=inventory, variable_manager=variable_manager, loader=loader, options=self.options,
pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
variable_manager=variable_manager, loader=loader,
passwords=passwords)
results = pbex.run()
@ -131,7 +139,7 @@ class PlaybookCLI(CLI):
mytags = set(play.tags)
msg += '\tTAGS: [%s]' % (','.join(mytags))
if self.options.listhosts:
if context.CLIARGS['listhosts']:
playhosts = set(inventory.get_hosts(play.hosts))
msg += "\n pattern: %s\n hosts (%d):" % (play.hosts, len(playhosts))
for host in playhosts:
@ -140,9 +148,9 @@ class PlaybookCLI(CLI):
display.display(msg)
all_tags = set()
if self.options.listtags or self.options.listtasks:
if context.CLIARGS['listtags'] or context.CLIARGS['listtasks']:
taskmsg = ''
if self.options.listtasks:
if context.CLIARGS['listtasks']:
taskmsg = ' tasks:\n'
def _process_block(b):
@ -155,7 +163,7 @@ class PlaybookCLI(CLI):
continue
all_tags.update(task.tags)
if self.options.listtasks:
if context.CLIARGS['listtasks']:
cur_tags = list(mytags.union(set(task.tags)))
cur_tags.sort()
if task.name:
@ -167,14 +175,14 @@ class PlaybookCLI(CLI):
return taskmsg
all_vars = variable_manager.get_vars(play=play)
play_context = PlayContext(play=play, options=self.options)
play_context = PlayContext(play=play)
for block in play.compile():
block = block.filter_tagged_tasks(play_context, all_vars)
if not block.has_tasks():
continue
taskmsg += _process_block(block)
if self.options.listtags:
if context.CLIARGS['listtags']:
cur_tags = list(mytags.union(all_tags))
cur_tags.sort()
taskmsg += " TASK TAGS: [%s]\n" % ', '.join(cur_tags)

@ -1,19 +1,6 @@
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Copyright: (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@ -27,8 +14,9 @@ import socket
import sys
import time
from ansible.cli import CLI
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleOptionsError
from ansible.module_utils._text import to_native, to_text
from ansible.plugins.loader import module_loader
@ -68,8 +56,8 @@ class PullCLI(CLI):
def _get_inv_cli(self):
inv_opts = ''
if getattr(self.options, 'inventory'):
for inv in self.options.inventory:
if context.CLIARGS.get('inventory', False):
for inv in context.CLIARGS['inventory']:
if isinstance(inv, list):
inv_opts += " -i '%s' " % ','.join(inv)
elif ',' in inv or os.path.exists(inv):
@ -77,10 +65,10 @@ class PullCLI(CLI):
return inv_opts
def parse(self):
def init_parser(self):
''' create an options parser for bin/ansible '''
self.parser = CLI.base_parser(
self.parser = super(PullCLI, self).init_parser(
usage='%prog -U <repository> [options] [<playbook.yml>]',
connect_opts=True,
vault_opts=True,
@ -126,32 +114,37 @@ class PullCLI(CLI):
self.parser.add_option("--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
super(PullCLI, self).parse()
return self.parser
if not self.options.dest:
def post_process_args(self, options, args):
options, args = super(PullCLI, self).post_process_args(options, args)
if not options.dest:
hostname = socket.getfqdn()
# use a hostname dependent directory, in case of $HOME on nfs
self.options.dest = os.path.join('~/.ansible/pull', hostname)
self.options.dest = os.path.expandvars(os.path.expanduser(self.options.dest))
options.dest = os.path.join('~/.ansible/pull', hostname)
options.dest = os.path.expandvars(os.path.expanduser(options.dest))
if os.path.exists(self.options.dest) and not os.path.isdir(self.options.dest):
raise AnsibleOptionsError("%s is not a valid or accessible directory." % self.options.dest)
if os.path.exists(options.dest) and not os.path.isdir(options.dest):
raise AnsibleOptionsError("%s is not a valid or accessible directory." % options.dest)
if self.options.sleep:
if options.sleep:
try:
secs = random.randint(0, int(self.options.sleep))
self.options.sleep = secs
secs = random.randint(0, int(options.sleep))
options.sleep = secs
except ValueError:
raise AnsibleOptionsError("%s is not a number." % self.options.sleep)
raise AnsibleOptionsError("%s is not a number." % options.sleep)
if not self.options.url:
if not options.url:
raise AnsibleOptionsError("URL for repository not specified, use -h for help")
if self.options.module_name not in self.SUPPORTED_REPO_MODULES:
raise AnsibleOptionsError("Unsupported repo module %s, choices are %s" % (self.options.module_name, ','.join(self.SUPPORTED_REPO_MODULES)))
if options.module_name not in self.SUPPORTED_REPO_MODULES:
raise AnsibleOptionsError("Unsupported repo module %s, choices are %s" % (options.module_name, ','.join(self.SUPPORTED_REPO_MODULES)))
display.verbosity = options.verbosity
self.validate_conflicts(options, vault_opts=True)
display.verbosity = self.options.verbosity
self.validate_conflicts(vault_opts=True)
return options, args
def run(self):
''' use Runner lib to do SSH things '''
@ -169,8 +162,8 @@ class PullCLI(CLI):
host = socket.getfqdn()
limit_opts = 'localhost,%s,127.0.0.1' % ','.join(set([host, node, host.split('.')[0], node.split('.')[0]]))
base_opts = '-c local '
if self.options.verbosity > 0:
base_opts += ' -%s' % ''.join(["v" for x in range(0, self.options.verbosity)])
if context.CLIARGS['verbosity'] > 0:
base_opts += ' -%s' % ''.join(["v" for x in range(0, context.CLIARGS['verbosity'])])
# Attempt to use the inventory passed in as an argument
# It might not yet have been downloaded so use localhost as default
@ -179,61 +172,65 @@ class PullCLI(CLI):
inv_opts = " -i localhost, "
# SCM specific options
if self.options.module_name == 'git':
repo_opts = "name=%s dest=%s" % (self.options.url, self.options.dest)
if self.options.checkout:
repo_opts += ' version=%s' % self.options.checkout
if context.CLIARGS['module_name'] == 'git':
repo_opts = "name=%s dest=%s" % (context.CLIARGS['url'], context.CLIARGS['dest'])
if context.CLIARGS['checkout']:
repo_opts += ' version=%s' % context.CLIARGS['checkout']
if self.options.accept_host_key:
if context.CLIARGS['accept_host_key']:
repo_opts += ' accept_hostkey=yes'
if self.options.private_key_file:
repo_opts += ' key_file=%s' % self.options.private_key_file
if context.CLIARGS['private_key_file']:
repo_opts += ' key_file=%s' % context.CLIARGS['private_key_file']
if self.options.verify:
if context.CLIARGS['verify']:
repo_opts += ' verify_commit=yes'
if self.options.tracksubs:
if context.CLIARGS['tracksubs']:
repo_opts += ' track_submodules=yes'
if not self.options.fullclone:
if not context.CLIARGS['fullclone']:
repo_opts += ' depth=1'
elif self.options.module_name == 'subversion':
repo_opts = "repo=%s dest=%s" % (self.options.url, self.options.dest)
if self.options.checkout:
repo_opts += ' revision=%s' % self.options.checkout
if not self.options.fullclone:
elif context.CLIARGS['module_name'] == 'subversion':
repo_opts = "repo=%s dest=%s" % (context.CLIARGS['url'], context.CLIARGS['dest'])
if context.CLIARGS['checkout']:
repo_opts += ' revision=%s' % context.CLIARGS['checkout']
if not context.CLIARGS['fullclone']:
repo_opts += ' export=yes'
elif self.options.module_name == 'hg':
repo_opts = "repo=%s dest=%s" % (self.options.url, self.options.dest)
if self.options.checkout:
repo_opts += ' revision=%s' % self.options.checkout
elif self.options.module_name == 'bzr':
repo_opts = "name=%s dest=%s" % (self.options.url, self.options.dest)
if self.options.checkout:
repo_opts += ' version=%s' % self.options.checkout
elif context.CLIARGS['module_name'] == 'hg':
repo_opts = "repo=%s dest=%s" % (context.CLIARGS['url'], context.CLIARGS['dest'])
if context.CLIARGS['checkout']:
repo_opts += ' revision=%s' % context.CLIARGS['checkout']
elif context.CLIARGS['module_name'] == 'bzr':
repo_opts = "name=%s dest=%s" % (context.CLIARGS['url'], context.CLIARGS['dest'])
if context.CLIARGS['checkout']:
repo_opts += ' version=%s' % context.CLIARGS['checkout']
else:
raise AnsibleOptionsError('Unsupported (%s) SCM module for pull, choices are: %s' % (self.options.module_name, ','.join(self.REPO_CHOICES)))
raise AnsibleOptionsError('Unsupported (%s) SCM module for pull, choices are: %s'
% (context.CLIARGS['module_name'],
','.join(self.REPO_CHOICES)))
# options common to all supported SCMS
if self.options.clean:
if context.CLIARGS['clean']:
repo_opts += ' force=yes'
path = module_loader.find_plugin(self.options.module_name)
path = module_loader.find_plugin(context.CLIARGS['module_name'])
if path is None:
raise AnsibleOptionsError(("module '%s' not found.\n" % self.options.module_name))
raise AnsibleOptionsError(("module '%s' not found.\n" % context.CLIARGS['module_name']))
bin_path = os.path.dirname(os.path.abspath(sys.argv[0]))
# hardcode local and inventory/host as this is just meant to fetch the repo
cmd = '%s/ansible %s %s -m %s -a "%s" all -l "%s"' % (bin_path, inv_opts, base_opts, self.options.module_name, repo_opts, limit_opts)
cmd = '%s/ansible %s %s -m %s -a "%s" all -l "%s"' % (bin_path, inv_opts, base_opts,
context.CLIARGS['module_name'],
repo_opts, limit_opts)
for ev in self.options.extra_vars:
for ev in context.CLIARGS['extra_vars']:
cmd += ' -e "%s"' % ev
# Nap?
if self.options.sleep:
display.display("Sleeping for %d seconds..." % self.options.sleep)
time.sleep(self.options.sleep)
if context.CLIARGS['sleep']:
display.display("Sleeping for %d seconds..." % context.CLIARGS['sleep'])
time.sleep(context.CLIARGS['sleep'])
# RUN the Checkout command
display.debug("running ansible with VCS module to checkout repo")
@ -241,45 +238,45 @@ class PullCLI(CLI):
rc, b_out, b_err = run_cmd(cmd, live=True)
if rc != 0:
if self.options.force:
if context.CLIARGS['force']:
display.warning("Unable to update repository. Continuing with (forced) run of playbook.")
else:
return rc
elif self.options.ifchanged and b'"changed": true' not in b_out:
elif context.CLIARGS['ifchanged'] and b'"changed": true' not in b_out:
display.display("Repository has not changed, quitting.")
return 0
playbook = self.select_playbook(self.options.dest)
playbook = self.select_playbook(context.CLIARGS['dest'])
if playbook is None:
raise AnsibleOptionsError("Could not find a playbook to run.")
# Build playbook command
cmd = '%s/ansible-playbook %s %s' % (bin_path, base_opts, playbook)
if self.options.vault_password_files:
for vault_password_file in self.options.vault_password_files:
if context.CLIARGS['vault_password_files']:
for vault_password_file in context.CLIARGS['vault_password_files']:
cmd += " --vault-password-file=%s" % vault_password_file
if self.options.vault_ids:
for vault_id in self.options.vault_ids:
if context.CLIARGS['vault_ids']:
for vault_id in context.CLIARGS['vault_ids']:
cmd += " --vault-id=%s" % vault_id
for ev in self.options.extra_vars:
for ev in context.CLIARGS['extra_vars']:
cmd += ' -e "%s"' % ev
if self.options.ask_sudo_pass or self.options.ask_su_pass or self.options.become_ask_pass:
if context.CLIARGS['ask_sudo_pass'] or context.CLIARGS['ask_su_pass'] or context.CLIARGS['become_ask_pass']:
cmd += ' --ask-become-pass'
if self.options.skip_tags:
cmd += ' --skip-tags "%s"' % to_native(u','.join(self.options.skip_tags))
if self.options.tags:
cmd += ' -t "%s"' % to_native(u','.join(self.options.tags))
if self.options.subset:
cmd += ' -l "%s"' % self.options.subset
if context.CLIARGS['skip_tags']:
cmd += ' --skip-tags "%s"' % to_native(u','.join(context.CLIARGS['skip_tags']))
if context.CLIARGS['tags']:
cmd += ' -t "%s"' % to_native(u','.join(context.CLIARGS['tags']))
if context.CLIARGS['subset']:
cmd += ' -l "%s"' % context.CLIARGS['subset']
else:
cmd += ' -l "%s"' % limit_opts
if self.options.check:
if context.CLIARGS['check']:
cmd += ' -C'
if self.options.diff:
if context.CLIARGS['diff']:
cmd += ' -D'
os.chdir(self.options.dest)
os.chdir(context.CLIARGS['dest'])
# redo inventory options as new files might exist now
inv_opts = self._get_inv_cli()
@ -291,12 +288,12 @@ class PullCLI(CLI):
display.debug('EXEC: %s' % cmd)
rc, b_out, b_err = run_cmd(cmd, live=True)
if self.options.purge:
if context.CLIARGS['purge']:
os.chdir('/')
try:
shutil.rmtree(self.options.dest)
shutil.rmtree(context.CLIARGS['dest'])
except Exception as e:
display.error(u"Failed to remove %s: %s" % (self.options.dest, to_text(e)))
display.error(u"Failed to remove %s: %s" % (context.CLIARGS['dest'], to_text(e)))
return rc
@ -309,8 +306,8 @@ class PullCLI(CLI):
def select_playbook(self, path):
playbook = None
if len(self.args) > 0 and self.args[0] is not None:
playbook = os.path.join(path, self.args[0])
if context.CLIARGS['args'] and context.CLIARGS['args'][0] is not None:
playbook = os.path.join(path, context.CLIARGS['args'][0])
rc = self.try_playbook(playbook)
if rc != 0:
display.warning("%s: %s" % (playbook, self.PLAYBOOK_ERRORS[rc]))

@ -1,20 +1,6 @@
# (c) 2014, James Tanner <tanner.jc@gmail.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
# ansible-vault is a script that encrypts/decrypts YAML files. See
# https://docs.ansible.com/playbooks_vault.html for more details.
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@ -22,8 +8,9 @@ __metaclass__ = type
import os
import sys
from ansible.cli import CLI
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.errors import AnsibleOptionsError
from ansible.module_utils._text import to_text, to_bytes
from ansible.parsing.dataloader import DataLoader
@ -75,7 +62,7 @@ class VaultCLI(CLI):
if self.action in self.can_output:
self.parser.add_option('--output', default=None, dest='output_file',
help='output file name for encrypt or decrypt; use - for stdout',
action="callback", callback=CLI.unfrack_path, type='string')
action="callback", callback=self.unfrack_path, type='string')
# options specific to self.actions
if self.action == "create":
@ -109,9 +96,9 @@ class VaultCLI(CLI):
action='store', type='string',
help='the vault id used to encrypt (required if more than vault-id is provided)')
def parse(self):
def init_parser(self):
self.parser = CLI.base_parser(
self.parser = super(VaultCLI, self).init_parser(
vault_opts=True,
vault_rekey_opts=True,
usage="usage: %%prog [%s] [options] [vaultfile.yml]" % "|".join(sorted(self.VALID_ACTIONS)),
@ -121,18 +108,21 @@ class VaultCLI(CLI):
self.set_action()
super(VaultCLI, self).parse()
self.validate_conflicts(vault_opts=True, vault_rekey_opts=True)
return self.parser
def post_process_args(self, options, args):
options, args = super(VaultCLI, self).post_process_args(options, args)
self.validate_conflicts(options, vault_opts=True, vault_rekey_opts=True)
display.verbosity = self.options.verbosity
display.verbosity = options.verbosity
if self.options.vault_ids:
for vault_id in self.options.vault_ids:
if options.vault_ids:
for vault_id in options.vault_ids:
if u';' in vault_id:
raise AnsibleOptionsError("'%s' is not a valid vault id. The character ';' is not allowed in vault ids" % vault_id)
if self.action not in self.can_output:
if len(self.args) == 0:
if not args:
raise AnsibleOptionsError("Vault requires at least one filename as a parameter")
else:
# This restriction should remain in place until it's possible to
@ -140,17 +130,19 @@ class VaultCLI(CLI):
# to create an encrypted file that can't be read back in. But in
# the meanwhile, "cat a b c|ansible-vault encrypt --output x" is
# a workaround.
if self.options.output_file and len(self.args) > 1:
if options.output_file and len(args) > 1:
raise AnsibleOptionsError("At most one input file may be used with the --output option")
if self.action == 'encrypt_string':
if '-' in self.args or len(self.args) == 0 or self.options.encrypt_string_stdin_name:
if '-' in args or not args or options.encrypt_string_stdin_name:
self.encrypt_string_read_stdin = True
# TODO: prompting from stdin and reading from stdin seem mutually exclusive, but verify that.
if self.options.encrypt_string_prompt and self.encrypt_string_read_stdin:
if options.encrypt_string_prompt and self.encrypt_string_read_stdin:
raise AnsibleOptionsError('The --prompt option is not supported if also reading input from stdin')
return options, args
def run(self):
super(VaultCLI, self).run()
loader = DataLoader()
@ -158,7 +150,7 @@ class VaultCLI(CLI):
# set default restrictive umask
old_umask = os.umask(0o077)
vault_ids = self.options.vault_ids
vault_ids = list(context.CLIARGS['vault_ids'])
# there are 3 types of actions, those that just 'read' (decrypt, view) and only
# need to ask for a password once, and those that 'write' (create, encrypt) that
@ -171,26 +163,25 @@ class VaultCLI(CLI):
# TODO: instead of prompting for these before, we could let VaultEditor
# call a callback when it needs it.
if self.action in ['decrypt', 'view', 'rekey', 'edit']:
vault_secrets = self.setup_vault_secrets(loader,
vault_ids=vault_ids,
vault_password_files=self.options.vault_password_files,
ask_vault_pass=self.options.ask_vault_pass)
vault_secrets = self.setup_vault_secrets(loader, vault_ids=vault_ids,
vault_password_files=list(context.CLIARGS['vault_password_files']),
ask_vault_pass=context.CLIARGS['ask_vault_pass'])
if not vault_secrets:
raise AnsibleOptionsError("A vault password is required to use Ansible's Vault")
if self.action in ['encrypt', 'encrypt_string', 'create']:
encrypt_vault_id = None
# no --encrypt-vault-id self.options.encrypt_vault_id for 'edit'
# no --encrypt-vault-id context.CLIARGS['encrypt_vault_id'] for 'edit'
if self.action not in ['edit']:
encrypt_vault_id = self.options.encrypt_vault_id or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
encrypt_vault_id = context.CLIARGS['encrypt_vault_id'] or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
vault_secrets = None
vault_secrets = \
self.setup_vault_secrets(loader,
vault_ids=vault_ids,
vault_password_files=self.options.vault_password_files,
ask_vault_pass=self.options.ask_vault_pass,
vault_password_files=context.CLIARGS['vault_password_files'],
ask_vault_pass=context.CLIARGS['ask_vault_pass'],
create_new_password=True)
if len(vault_secrets) > 1 and not encrypt_vault_id:
@ -209,7 +200,7 @@ class VaultCLI(CLI):
self.encrypt_secret = encrypt_secret[1]
if self.action in ['rekey']:
encrypt_vault_id = self.options.encrypt_vault_id or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
encrypt_vault_id = context.CLIARGS['encrypt_vault_id'] or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
# print('encrypt_vault_id: %s' % encrypt_vault_id)
# print('default_encrypt_vault_id: %s' % default_encrypt_vault_id)
@ -218,18 +209,18 @@ class VaultCLI(CLI):
new_vault_ids = []
if encrypt_vault_id:
new_vault_ids = default_vault_ids
if self.options.new_vault_id:
new_vault_ids.append(self.options.new_vault_id)
if context.CLIARGS['new_vault_id']:
new_vault_ids.append(context.CLIARGS['new_vault_id'])
new_vault_password_files = []
if self.options.new_vault_password_file:
new_vault_password_files.append(self.options.new_vault_password_file)
if context.CLIARGS['new_vault_password_file']:
new_vault_password_files.append(context.CLIARGS['new_vault_password_file'])
new_vault_secrets = \
self.setup_vault_secrets(loader,
vault_ids=new_vault_ids,
vault_password_files=new_vault_password_files,
ask_vault_pass=self.options.ask_vault_pass,
ask_vault_pass=context.CLIARGS['ask_vault_pass'],
create_new_password=True)
if not new_vault_secrets:
@ -257,14 +248,14 @@ class VaultCLI(CLI):
def execute_encrypt(self):
''' encrypt the supplied file using the provided vault secret '''
if len(self.args) == 0 and sys.stdin.isatty():
if not context.CLIARGS['args'] and sys.stdin.isatty():
display.display("Reading plaintext input from stdin", stderr=True)
for f in self.args or ['-']:
for f in context.CLIARGS['args'] or ['-']:
# Fixme: use the correct vau
self.editor.encrypt_file(f, self.encrypt_secret,
vault_id=self.encrypt_vault_id,
output_file=self.options.output_file)
output_file=context.CLIARGS['output_file'])
if sys.stdout.isatty():
display.display("Encryption successful", stderr=True)
@ -296,10 +287,10 @@ class VaultCLI(CLI):
# remove the non-option '-' arg (used to indicate 'read from stdin') from the candidate args so
# we don't add it to the plaintext list
args = [x for x in self.args if x != '-']
args = [x for x in context.CLIARGS['args'] if x != '-']
# We can prompt and read input, or read from stdin, but not both.
if self.options.encrypt_string_prompt:
if context.CLIARGS['encrypt_string_prompt']:
msg = "String to encrypt: "
name = None
@ -332,20 +323,21 @@ class VaultCLI(CLI):
b_plaintext = to_bytes(stdin_text)
# defaults to None
name = self.options.encrypt_string_stdin_name
name = context.CLIARGS['encrypt_string_stdin_name']
b_plaintext_list.append((b_plaintext, self.FROM_STDIN, name))
# use any leftover args as strings to encrypt
# Try to match args up to --name options
if hasattr(self.options, 'encrypt_string_names') and self.options.encrypt_string_names:
name_and_text_list = list(zip(self.options.encrypt_string_names, args))
if context.CLIARGS.get('encrypt_string_names', False):
name_and_text_list = list(zip(context.CLIARGS['encrypt_string_names'], args))
# Some but not enough --name's to name each var
if len(args) > len(name_and_text_list):
# Trying to avoid ever showing the plaintext in the output, so this warning is vague to avoid that.
display.display('The number of --name options do not match the number of args.',
stderr=True)
display.display('The last named variable will be "%s". The rest will not have names.' % self.options.encrypt_string_names[-1],
display.display('The last named variable will be "%s". The rest will not have'
' names.' % context.CLIARGS['encrypt_string_names'][-1],
stderr=True)
# Add the rest of the args without specifying a name
@ -419,11 +411,11 @@ class VaultCLI(CLI):
def execute_decrypt(self):
''' decrypt the supplied file using the provided vault secret '''
if len(self.args) == 0 and sys.stdin.isatty():
if not context.CLIARGS['args'] and sys.stdin.isatty():
display.display("Reading ciphertext input from stdin", stderr=True)
for f in self.args or ['-']:
self.editor.decrypt_file(f, output_file=self.options.output_file)
for f in context.CLIARGS['args'] or ['-']:
self.editor.decrypt_file(f, output_file=context.CLIARGS['output_file'])
if sys.stdout.isatty():
display.display("Decryption successful", stderr=True)
@ -431,21 +423,21 @@ class VaultCLI(CLI):
def execute_create(self):
''' create and open a file in an editor that will be encrypted with the provided vault secret when closed'''
if len(self.args) > 1:
if len(context.CLIARGS['args']) > 1:
raise AnsibleOptionsError("ansible-vault create can take only one filename argument")
self.editor.create_file(self.args[0], self.encrypt_secret,
self.editor.create_file(context.CLIARGS['args'][0], self.encrypt_secret,
vault_id=self.encrypt_vault_id)
def execute_edit(self):
''' open and decrypt an existing vaulted file in an editor, that will be encrypted again when closed'''
for f in self.args:
for f in context.CLIARGS['args']:
self.editor.edit_file(f)
def execute_view(self):
''' open, decrypt and view an existing vaulted file using a pager using the supplied vault secret '''
for f in self.args:
for f in context.CLIARGS['args']:
# Note: vault should return byte strings because it could encrypt
# and decrypt binary files. We are responsible for changing it to
# unicode here because we are displaying it and therefore can make
@ -456,7 +448,7 @@ class VaultCLI(CLI):
def execute_rekey(self):
''' re-encrypt a vaulted file with a new secret, the previous secret is required '''
for f in self.args:
for f in context.CLIARGS['args']:
# FIXME: plumb in vault_id, use the default new_vault_secret for now
self.editor.rekey_file(f, self.new_encrypt_secret,
self.new_encrypt_vault_id)

@ -0,0 +1,53 @@
# Copyright: (c) 2018, Toshio Kuratomi <tkuratomi@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
"""
Context of the running Ansible.
In the future we *may* create Context objects to allow running multiple Ansible plays in parallel
with different contexts but that is currently out of scope as the Ansible library is just for
running the ansible command line tools.
These APIs are still in flux so do not use them unless you are willing to update them with every Ansible release
"""
from ansible import arguments
# Note: this is not the singleton version. That is only created once the program has actually
# parsed the args
CLIARGS = arguments.CLIArgs({})
class _Context:
"""
Not yet ready for Prime Time
Eventually this may allow for code which needs to run under different contexts (for instance, as
if they were run with different command line args or from different current working directories)
to exist in the same process. But at the moment, we don't need that so this code has not been
tested for suitability.
"""
def __init__(self):
global CLIARGS
self._CLIARGS = arguments.CLIArgs(CLIARGS)
@property
def CLIARGS(self):
return self._CLIARGS
@CLIARGS.setter
def CLIARGS_set(self, new_cli_args):
if not isinstance(new_cli_args, arguments.CLIArgs):
raise TypeError('CLIARGS must be of type (ansible.arguments.CLIArgs)')
self._CLIARGS = new_cli_args
def _init_global_context(cli_args):
"""Initialize the global context objects"""
global CLIARGS
CLIARGS = arguments.GlobalCLIArgs.from_options(cli_args)

@ -22,6 +22,7 @@ __metaclass__ = type
import os
from ansible import constants as C
from ansible import context
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_native, to_text
from ansible.playbook import Playbook
@ -42,19 +43,20 @@ class PlaybookExecutor:
basis for bin/ansible-playbook operation.
'''
def __init__(self, playbooks, inventory, variable_manager, loader, options, passwords):
def __init__(self, playbooks, inventory, variable_manager, loader, passwords):
self._playbooks = playbooks
self._inventory = inventory
self._variable_manager = variable_manager
self._loader = loader
self._options = options
self.passwords = passwords
self._unreachable_hosts = dict()
if options.listhosts or options.listtasks or options.listtags or options.syntax:
if context.CLIARGS.get('listhosts') or context.CLIARGS.get('listtasks') or \
context.CLIARGS.get('listtags') or context.CLIARGS.get('syntax'):
self._tqm = None
else:
self._tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=self.passwords)
self._tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager,
loader=loader, passwords=self.passwords)
# Note: We run this here to cache whether the default ansible ssh
# executable supports control persist. Sometime in the future we may
@ -127,7 +129,7 @@ class PlaybookExecutor:
templar = Templar(loader=self._loader, variables=all_vars)
play.post_validate(templar)
if self._options.syntax:
if context.CLIARGS['syntax']:
continue
if self._tqm is None:
@ -218,15 +220,15 @@ class PlaybookExecutor:
if self._loader:
self._loader.cleanup_all_tmp_files()
if self._options.syntax:
if context.CLIARGS['syntax']:
display.display("No issues encountered")
return result
if self._options.start_at_task and not self._tqm._start_at_done:
if context.CLIARGS['start_at_task'] and not self._tqm._start_at_done:
display.error(
"No matching task \"%s\" found. "
"Note: --start-at-task can only follow static includes."
% self._options.start_at_task
"No matching task \"%s\" found."
" Note: --start-at-task can only follow static includes."
% context.CLIARGS['start_at_task']
)
return result

@ -24,6 +24,7 @@ import os
import tempfile
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.executor.stats import AggregateStats
@ -65,25 +66,25 @@ class TaskQueueManager:
RUN_FAILED_BREAK_PLAY = 8
RUN_UNKNOWN_ERROR = 255
def __init__(self, inventory, variable_manager, loader, options, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False):
def __init__(self, inventory, variable_manager, loader, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None):
self._inventory = inventory
self._variable_manager = variable_manager
self._loader = loader
self._options = options
self._stats = AggregateStats()
self.passwords = passwords
self._stdout_callback = stdout_callback
self._run_additional_callbacks = run_additional_callbacks
self._run_tree = run_tree
self._forks = forks or 5
self._callbacks_loaded = False
self._callback_plugins = []
self._start_at_done = False
# make sure any module paths (if specified) are added to the module_loader
if options.module_path:
for path in options.module_path:
if context.CLIARGS.get('module_path', False):
for path in context.CLIARGS['module_path']:
if path:
module_loader.add_directory(path)
@ -214,7 +215,7 @@ class TaskQueueManager:
loader=self._loader,
)
play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno())
if (self._stdout_callback and
hasattr(self._stdout_callback, 'set_play_context')):
self._stdout_callback.set_play_context(play_context)
@ -239,7 +240,7 @@ class TaskQueueManager:
)
# adjust to # of workers to configured forks or size of batch, whatever is lower
self._initialize_processes(min(self._options.forks, iterator.batch_size))
self._initialize_processes(min(self._forks, iterator.batch_size))
# load the specified strategy (or the default linear one)
strategy = strategy_loader.get(new_play.strategy, self)
@ -259,7 +260,7 @@ class TaskQueueManager:
# during initialization, the PlayContext will clear the start_at_task
# field to signal that a matching task was found, so check that here
# and remember it so we don't try to skip tasks on future plays
if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None:
if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None:
self._start_at_done = True
# and run the play using the strategy and cleanup on way out

@ -25,6 +25,7 @@ __metaclass__ = type
import os
from ansible import context
from ansible.errors import AnsibleError
from ansible.module_utils.six import string_types
@ -35,19 +36,18 @@ from ansible.module_utils.six import string_types
class Galaxy(object):
''' Keeps global galaxy info '''
def __init__(self, options):
def __init__(self):
self.options = options
# self.options.roles_path needs to be a list and will be by default
roles_path = getattr(self.options, 'roles_path', [])
# cli option handling is responsible for making roles_path a list
# roles_path needs to be a list and will be by default
roles_path = context.CLIARGS.get('roles_path', tuple())
# cli option handling is responsible for splitting roles_path
self.roles_paths = roles_path
self.roles = {}
# load data path for resource usage
this_dir, this_filename = os.path.split(__file__)
type_path = getattr(self.options, 'role_type', "default")
type_path = context.CLIARGS.get('role_type', "default")
self.DATA_PATH = os.path.join(this_dir, 'data', type_path)
@property

@ -24,6 +24,7 @@ __metaclass__ = type
import json
from ansible import context
import ansible.constants as C
from ansible.errors import AnsibleError
from ansible.galaxy.token import GalaxyToken
@ -63,7 +64,7 @@ class GalaxyAPI(object):
self.galaxy = galaxy
self.token = GalaxyToken()
self._api_server = C.GALAXY_SERVER
self._validate_certs = not galaxy.options.ignore_certs
self._validate_certs = not context.CLIARGS['ignore_certs']
self.baseurl = None
self.version = None
self.initialized = False
@ -71,8 +72,8 @@ class GalaxyAPI(object):
display.debug('Validate TLS certificates: %s' % self._validate_certs)
# set the API server
if galaxy.options.api_server != C.GALAXY_SERVER:
self._api_server = galaxy.options.api_server
if context.CLIARGS['api_server'] != C.GALAXY_SERVER:
self._api_server = context.CLIARGS['api_server']
def __auth_header(self):
token = self.token.get()

@ -31,6 +31,7 @@ import yaml
from distutils.version import LooseVersion
from shutil import rmtree
from ansible import context
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.urls import open_url
@ -52,11 +53,10 @@ class GalaxyRole(object):
self._metadata = None
self._install_info = None
self._validate_certs = not galaxy.options.ignore_certs
self._validate_certs = not context.CLIARGS['ignore_certs']
display.debug('Validate TLS certificates: %s' % self._validate_certs)
self.options = galaxy.options
self.galaxy = galaxy
self.name = name
@ -196,7 +196,7 @@ class GalaxyRole(object):
if self.scm:
# create tar file from scm url
tmp_file = RoleRequirement.scm_archive_role(keep_scm_meta=self.options.keep_scm_meta, **self.spec)
tmp_file = RoleRequirement.scm_archive_role(keep_scm_meta=context.CLIARGS['keep_scm_meta'], **self.spec)
elif self.src:
if os.path.isfile(self.src):
tmp_file = self.src
@ -298,7 +298,7 @@ class GalaxyRole(object):
if os.path.exists(self.path):
if not os.path.isdir(self.path):
raise AnsibleError("the specified roles path exists and is not a directory.")
elif not getattr(self.options, "force", False):
elif not context.CLIARGS.get("force", False):
raise AnsibleError("the specified role %s appears to already exist. Use --force to replace it." % self.name)
else:
# using --force, remove the old path

@ -29,6 +29,7 @@ import string
import sys
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError
from ansible.module_utils.six import iteritems
from ansible.module_utils.six.moves import shlex_quote
@ -186,7 +187,7 @@ class PlayContext(Base):
_gather_timeout = FieldAttribute(isa='string', default=C.DEFAULT_GATHER_TIMEOUT)
_fact_path = FieldAttribute(isa='string', default=C.DEFAULT_FACT_PATH)
def __init__(self, play=None, options=None, passwords=None, connection_lockfd=None):
def __init__(self, play=None, passwords=None, connection_lockfd=None):
super(PlayContext, self).__init__()
@ -203,8 +204,8 @@ class PlayContext(Base):
self.connection_lockfd = connection_lockfd
# set options before play to allow play to override them
if options:
self.set_options(options)
if context.CLIARGS:
self.set_options()
if play:
self.set_play(play)
@ -250,7 +251,7 @@ class PlayContext(Base):
# for flag in ('ssh_common_args', 'docker_extra_args', 'sftp_extra_args', 'scp_extra_args', 'ssh_extra_args'):
# setattr(self, flag, getattr(options, flag, ''))
def set_options(self, options):
def set_options(self):
'''
Configures this connection information instance with data from
options specified by the user on the command line. These have a
@ -258,33 +259,33 @@ class PlayContext(Base):
'''
# privilege escalation
self.become = options.become
self.become_method = options.become_method
self.become_user = options.become_user
self.become = context.CLIARGS['become']
self.become_method = context.CLIARGS['become_method']
self.become_user = context.CLIARGS['become_user']
self.check_mode = boolean(options.check, strict=False)
self.diff = boolean(options.diff, strict=False)
self.check_mode = boolean(context.CLIARGS['check'], strict=False)
self.diff = boolean(context.CLIARGS['diff'], strict=False)
# general flags (should we move out?)
# should only be 'non plugin' flags
for flag in OPTION_FLAGS:
attribute = getattr(options, flag, False)
attribute = context.CLIARGS.get(flag, False)
if attribute:
setattr(self, flag, attribute)
if hasattr(options, 'timeout') and options.timeout:
self.timeout = int(options.timeout)
if context.CLIARGS.get('timeout', False):
self.timeout = context.CLIARGS['timeout']
# get the tag info from options. We check to see if the options have
# the attribute, as it is not always added via the CLI
if hasattr(options, 'tags'):
self.only_tags.update(options.tags)
if context.CLIARGS.get('tags', False):
self.only_tags.update(context.CLIARGS['tags'])
if len(self.only_tags) == 0:
self.only_tags = set(['all'])
if hasattr(options, 'skip_tags'):
self.skip_tags.update(options.skip_tags)
if context.CLIARGS.get('skip_tags', False):
self.skip_tags.update(context.CLIARGS['skip_tags'])
def set_task_and_variable_override(self, task, variables, templar):
'''

@ -46,11 +46,6 @@ else:
global_display = Display()
try:
from __main__ import cli
except ImportError:
# using API w/o cli
cli = False
__all__ = ["CallbackBase"]
@ -72,11 +67,6 @@ class CallbackBase(AnsiblePlugin):
else:
self._display = global_display
if cli:
self._options = cli.options
else:
self._options = None
if self._display.verbosity >= 4:
name = getattr(self, 'CALLBACK_NAME', 'unnamed')
ctype = getattr(self, 'CALLBACK_TYPE', 'old')

@ -19,6 +19,7 @@ DOCUMENTATION = '''
'''
from ansible import constants as C
from ansible import context
from ansible.playbook.task_include import TaskInclude
from ansible.plugins.callback import CallbackBase
from ansible.utils.color import colorize, hostcolor
@ -370,15 +371,16 @@ class CallbackModule(CallbackBase):
from os.path import basename
self._display.banner("PLAYBOOK: %s" % basename(playbook._file_name))
# show CLI arguments
if self._display.verbosity > 3:
# show CLI options
if self._options is not None:
for option in dir(self._options):
if option.startswith('_') or option in ['read_file', 'ensure_value', 'read_module']:
continue
val = getattr(self._options, option)
if val and self._display.verbosity > 3:
self._display.display('%s: %s' % (option, val), color=C.COLOR_VERBOSE, screen_only=True)
if context.CLIARGS.get('args'):
self._display.display('Positional arguments: %s' % ' '.join(context.CLIARGS['args']),
color=C.COLOR_VERBOSE, screen_only=True)
for argument in (a for a in context.CLIARGS if a != 'args'):
val = context.CLIARGS[argument]
if val:
self._display.display('%s: %s' % (argument, val), color=C.COLOR_VERBOSE, screen_only=True)
def v2_runner_retry(self, result):
task_name = result.task_name or result._task

@ -58,11 +58,7 @@ import json
import os
import uuid
try:
from __main__ import cli
except ImportError:
cli = None
from ansible import context
from ansible.module_utils._text import to_text
from ansible.module_utils.urls import open_url
from ansible.plugins.callback import CallbackBase
@ -87,8 +83,6 @@ class CallbackModule(CallbackBase):
super(CallbackModule, self).__init__(display=display)
self._options = cli.options
if not HAS_PRETTYTABLE:
self.disabled = True
self._display.warning('The `prettytable` python module is not '
@ -145,13 +139,14 @@ class CallbackModule(CallbackBase):
title = [
'*Playbook initiated* (_%s_)' % self.guid
]
invocation_items = []
if self._options and self.show_invocation:
tags = self._options.tags
skip_tags = self._options.skip_tags
extra_vars = self._options.extra_vars
subset = self._options.subset
inventory = [os.path.abspath(i) for i in self._options.inventory]
if context.CLIARGS and self.show_invocation:
tags = context.CLIARGS['tags']
skip_tags = context.CLIARGS['skip_tags']
extra_vars = context.CLIARGS['extra_vars']
subset = context.CLIARGS['subset']
inventory = [os.path.abspath(i) for i in context.CLIARGS['inventory']]
invocation_items.append('Inventory: %s' % ', '.join(inventory))
if tags and tags != ['all']:
@ -164,7 +159,7 @@ class CallbackModule(CallbackBase):
invocation_items.append('Extra Vars: %s' %
' '.join(extra_vars))
title.append('by *%s*' % self._options.remote_user)
title.append('by *%s*' % context.CLIARGS['remote_user'])
title.append('\n\n*%s*' % self.playbook_name)
msg_items = [' '.join(title)]

@ -23,6 +23,7 @@ DOCUMENTATION = '''
from os.path import basename
from ansible import constants as C
from ansible import context
from ansible.module_utils._text import to_text
from ansible.plugins.callback import CallbackBase
from ansible.utils.color import colorize, hostcolor
@ -200,14 +201,16 @@ class CallbackModule(CallbackBase):
# TODO display whether this run is happening in check mode
self._display.display("Executing playbook %s" % basename(playbook._file_name))
# show CLI arguments
if self._display.verbosity > 3:
if self._options is not None:
for option in dir(self._options):
if option.startswith('_') or option in ['read_file', 'ensure_value', 'read_module']:
continue
val = getattr(self._options, option)
if context.CLIARGS.get('args'):
self._display.display('Positional arguments: %s' % ' '.join(context.CLIARGS['args']),
color=C.COLOR_VERBOSE, screen_only=True)
for argument in (a for a in context.CLIARGS if a != 'args'):
val = context.CLIARGS[argument]
if val:
self._display.vvvv('%s: %s' % (option, val))
self._display.vvvv('%s: %s' % (argument, val))
def v2_runner_retry(self, result):
msg = " Retrying... (%d of %d)" % (result._result['attempts'], result._result['retries'])

@ -32,6 +32,7 @@ from multiprocessing import Lock
from jinja2.exceptions import UndefinedError
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
from ansible.executor.process.worker import WorkerProcess
@ -170,9 +171,9 @@ class StrategyBase:
self._variable_manager = tqm.get_variable_manager()
self._loader = tqm.get_loader()
self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False)
self.flush_cache = getattr(tqm._options, 'flush_cache', False)
self._step = context.CLIARGS.get('step', False)
self._diff = context.CLIARGS.get('diff', False)
self.flush_cache = context.CLIARGS.get('flush_cache', False)
# the task cache is a dictionary of tuples of (host.name, task._uuid)
# used to find the original task object of in-flight tasks and to store

@ -27,6 +27,7 @@ from json import dumps
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_native, to_text
@ -119,10 +120,9 @@ def merge_hash(a, b):
return result
def load_extra_vars(loader, options):
def load_extra_vars(loader):
extra_vars = {}
if hasattr(options, 'extra_vars'):
for extra_vars_opt in options.extra_vars:
for extra_vars_opt in context.CLIARGS.get('extra_vars', tuple()):
data = None
extra_vars_opt = to_text(extra_vars_opt, errors='surrogate_or_strict')
if extra_vars_opt.startswith(u"@"):
@ -143,7 +143,7 @@ def load_extra_vars(loader, options):
return extra_vars
def load_options_vars(options, version):
def load_options_vars(version):
options_vars = {'ansible_version': version}
attrs = {'check': 'check_mode',
@ -156,7 +156,7 @@ def load_options_vars(options, version):
'verbosity': 'verbosity'}
for attr, alias in attrs.items():
opt = getattr(options, attr, None)
opt = context.CLIARGS.get(attr)
if opt is not None:
options_vars['ansible_%s' % alias] = opt

@ -5,6 +5,8 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible import context
from ansible.cli.adhoc import AdHocCLI, display
from ansible.errors import AnsibleOptionsError
@ -22,7 +24,7 @@ def test_with_command():
module_name = 'command'
adhoc_cli = AdHocCLI(args=['-m', module_name, '-vv'])
adhoc_cli.parse()
assert adhoc_cli.options.module_name == module_name
assert context.CLIARGS['module_name'] == module_name
assert display.verbosity == 2
@ -36,9 +38,8 @@ def test_with_extra_parameters():
def test_simple_command():
""" Test valid command and its run"""
adhoc_cli = AdHocCLI(['/bin/ansible', '-m', 'command', 'localhost'])
adhoc_cli = AdHocCLI(['/bin/ansible', '-m', 'command', 'localhost', '-a', 'echo "hi"'])
adhoc_cli.parse()
adhoc_cli.options.module_args = "echo 'hi'"
ret = adhoc_cli.run()
assert ret == 0
@ -63,9 +64,8 @@ def test_did_you_mean_playbook():
def test_play_ds_positive():
""" Test _play_ds"""
adhoc_cli = AdHocCLI(args=['/bin/ansible', 'localhost'])
adhoc_cli = AdHocCLI(args=['/bin/ansible', 'localhost', '-m', 'command'])
adhoc_cli.parse()
adhoc_cli.options.module_name = 'command'
ret = adhoc_cli._play_ds('command', 10, 2)
assert ret['name'] == 'Ansible Ad-Hoc'
assert ret['tasks'] == [{'action': {'module': 'command', 'args': {}}, 'async_val': 10, 'poll': 2}]
@ -73,9 +73,8 @@ def test_play_ds_positive():
def test_play_ds_with_include_role():
""" Test include_role command with poll"""
adhoc_cli = AdHocCLI(args=['/bin/ansible', 'localhost'])
adhoc_cli = AdHocCLI(args=['/bin/ansible', 'localhost', '-m', 'include_role'])
adhoc_cli.parse()
adhoc_cli.options.module_name = 'include_role'
ret = adhoc_cli._play_ds('include_role', None, 2)
assert ret['name'] == 'Ansible Ad-Hoc'
assert ret['gather_facts'] == 'no'
@ -88,5 +87,5 @@ def test_run_import_playbook():
adhoc_cli.parse()
with pytest.raises(AnsibleOptionsError) as exec_info:
adhoc_cli.run()
assert adhoc_cli.options.module_name == import_playbook
assert context.CLIARGS['module_name'] == import_playbook
assert "'%s' is not a valid action for ad-hoc commands" % import_playbook == str(exec_info.value)

@ -26,6 +26,8 @@ import tarfile
import tempfile
import yaml
from ansible import arguments
from ansible import context
from ansible.cli.galaxy import GalaxyCLI
from units.compat import unittest
from units.compat.mock import call, patch
@ -47,7 +49,6 @@ class TestGalaxy(unittest.TestCase):
# creating framework for a role
gc = GalaxyCLI(args=["ansible-galaxy", "init", "--offline", "delete_me"])
gc.parse()
gc.run()
cls.role_dir = "./delete_me"
cls.role_name = "delete_me"
@ -96,8 +97,14 @@ class TestGalaxy(unittest.TestCase):
shutil.rmtree(cls.temp_dir)
def setUp(self):
# Reset the stored command line args
arguments.GlobalCLIArgs._Singleton__instance = None
self.default_args = ['ansible-galaxy']
def tearDown(self):
# Reset the stored command line args
arguments.GlobalCLIArgs._Singleton__instance = None
def test_init(self):
galaxy_cli = GalaxyCLI(args=self.default_args)
self.assertTrue(isinstance(galaxy_cli, GalaxyCLI))
@ -120,12 +127,11 @@ class TestGalaxy(unittest.TestCase):
def test_run(self):
''' verifies that the GalaxyCLI object's api is created and that execute() is called. '''
gc = GalaxyCLI(args=["ansible-galaxy", "install", "--ignore-errors", "imaginary_role"])
gc.parse()
with patch.object(ansible.cli.CLI, "execute", return_value=None) as mock_ex:
with patch.object(ansible.cli.CLI, "run", return_value=None) as mock_run:
gc.run()
# testing
self.assertIsInstance(gc.galaxy, ansible.galaxy.Galaxy)
self.assertEqual(mock_run.call_count, 1)
self.assertTrue(isinstance(gc.api, ansible.galaxy.api.GalaxyAPI))
self.assertEqual(mock_ex.call_count, 1)
@ -133,15 +139,16 @@ class TestGalaxy(unittest.TestCase):
def test_execute_remove(self):
# installing role
gc = GalaxyCLI(args=["ansible-galaxy", "install", "-p", self.role_path, "-r", self.role_req, '--force'])
gc.parse()
gc.run()
# location where the role was installed
role_file = os.path.join(self.role_path, self.role_name)
# removing role
# Have to reset the arguments in the context object manually since we're doing the
# equivalent of running the command line program twice
arguments.GlobalCLIArgs._Singleton__instance = None
gc = GalaxyCLI(args=["ansible-galaxy", "remove", role_file, self.role_name])
gc.parse()
gc.run()
# testing role was removed
@ -151,7 +158,6 @@ class TestGalaxy(unittest.TestCase):
def test_exit_without_ignore_without_flag(self):
''' tests that GalaxyCLI exits with the error specified if the --ignore-errors flag is not used '''
gc = GalaxyCLI(args=["ansible-galaxy", "install", "--server=None", "fake_role_name"])
gc.parse()
with patch.object(ansible.utils.display.Display, "display", return_value=None) as mocked_display:
# testing that error expected is raised
self.assertRaises(AnsibleError, gc.run)
@ -161,7 +167,6 @@ class TestGalaxy(unittest.TestCase):
''' tests that GalaxyCLI exits without the error specified if the --ignore-errors flag is used '''
# testing with --ignore-errors flag
gc = GalaxyCLI(args=["ansible-galaxy", "install", "--server=None", "fake_role_name", "--ignore-errors"])
gc.parse()
with patch.object(ansible.utils.display.Display, "display", return_value=None) as mocked_display:
gc.run()
self.assertTrue(mocked_display.called_once_with("- downloading role 'fake_role_name', owned by "))
@ -172,7 +177,6 @@ class TestGalaxy(unittest.TestCase):
# checking that the common results of parse() for all possible actions have been created/called
self.assertIsInstance(galaxycli_obj.parser, ansible.cli.SortedOptParser)
self.assertIsInstance(galaxycli_obj.galaxy, ansible.galaxy.Galaxy)
formatted_call = {
'import': 'usage: %prog import [options] github_user github_repo',
'delete': 'usage: %prog delete [options] github_user github_repo',
@ -206,74 +210,74 @@ class TestGalaxy(unittest.TestCase):
''' testing the options parser when the action 'delete' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "delete"])
self.run_parse_common(gc, "delete")
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_import(self):
''' testing the options parser when the action 'import' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "import"])
self.run_parse_common(gc, "import")
self.assertEqual(gc.options.wait, True)
self.assertEqual(gc.options.reference, None)
self.assertEqual(gc.options.check_status, False)
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(context.CLIARGS['wait'], True)
self.assertEqual(context.CLIARGS['reference'], None)
self.assertEqual(context.CLIARGS['check_status'], False)
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_info(self):
''' testing the options parser when the action 'info' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "info"])
self.run_parse_common(gc, "info")
self.assertEqual(gc.options.offline, False)
self.assertEqual(context.CLIARGS['offline'], False)
def test_parse_init(self):
''' testing the options parser when the action 'init' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "init"])
self.run_parse_common(gc, "init")
self.assertEqual(gc.options.offline, False)
self.assertEqual(gc.options.force, False)
self.assertEqual(context.CLIARGS['offline'], False)
self.assertEqual(context.CLIARGS['force'], False)
def test_parse_install(self):
''' testing the options parser when the action 'install' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "install"])
self.run_parse_common(gc, "install")
self.assertEqual(gc.options.ignore_errors, False)
self.assertEqual(gc.options.no_deps, False)
self.assertEqual(gc.options.role_file, None)
self.assertEqual(gc.options.force, False)
self.assertEqual(context.CLIARGS['ignore_errors'], False)
self.assertEqual(context.CLIARGS['no_deps'], False)
self.assertEqual(context.CLIARGS['role_file'], None)
self.assertEqual(context.CLIARGS['force'], False)
def test_parse_list(self):
''' testing the options parser when the action 'list' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "list"])
self.run_parse_common(gc, "list")
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_login(self):
''' testing the options parser when the action 'login' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "login"])
self.run_parse_common(gc, "login")
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(gc.options.token, None)
self.assertEqual(context.CLIARGS['verbosity'], 0)
self.assertEqual(context.CLIARGS['token'], None)
def test_parse_remove(self):
''' testing the options parser when the action 'remove' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "remove"])
self.run_parse_common(gc, "remove")
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_search(self):
''' testing the options parswer when the action 'search' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "search"])
self.run_parse_common(gc, "search")
self.assertEqual(gc.options.platforms, None)
self.assertEqual(gc.options.galaxy_tags, None)
self.assertEqual(gc.options.author, None)
self.assertEqual(context.CLIARGS['platforms'], None)
self.assertEqual(context.CLIARGS['galaxy_tags'], None)
self.assertEqual(context.CLIARGS['author'], None)
def test_parse_setup(self):
''' testing the options parser when the action 'setup' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "setup"])
self.run_parse_common(gc, "setup")
self.assertEqual(gc.options.verbosity, 0)
self.assertEqual(gc.options.remove_id, None)
self.assertEqual(gc.options.setup_list, False)
self.assertEqual(context.CLIARGS['verbosity'], 0)
self.assertEqual(context.CLIARGS['remove_id'], None)
self.assertEqual(context.CLIARGS['setup_list'], False)
class ValidRoleTests(object):
@ -299,7 +303,6 @@ class ValidRoleTests(object):
# create role using default skeleton
gc = GalaxyCLI(args=['ansible-galaxy', 'init', '-c', '--offline'] + galaxy_args + ['--init-path', cls.test_dir, cls.role_name])
gc.parse()
gc.run()
cls.gc = gc
@ -466,4 +469,4 @@ class TestGalaxyInitSkeleton(unittest.TestCase, ValidRoleTests):
self.assertTrue(os.path.exists(os.path.join(self.role_dir, 'templates_extra', 'templates.txt')))
def test_skeleton_option(self):
self.assertEquals(self.role_skeleton_path, self.gc.options.role_skeleton, msg='Skeleton path was not parsed properly from the command line')
self.assertEquals(self.role_skeleton_path, context.CLIARGS['role_skeleton'], msg='Skeleton path was not parsed properly from the command line')

@ -22,6 +22,7 @@ __metaclass__ = type
from units.compat import unittest
from units.mock.loader import DictDataLoader
from ansible import context
from ansible.inventory.manager import InventoryManager
from ansible.vars.manager import VariableManager
@ -32,7 +33,7 @@ class TestPlaybookCLI(unittest.TestCase):
def test_flush_cache(self):
cli = PlaybookCLI(args=["ansible-playbook", "--flush-cache", "foobar.yml"])
cli.parse()
self.assertTrue(cli.options.flush_cache)
self.assertTrue(context.CLIARGS['flush_cache'])
variable_manager = VariableManager()
fake_loader = DictDataLoader({'foobar.yml': ""})

@ -21,6 +21,8 @@ __metaclass__ = type
from units.compat import unittest
from units.compat.mock import MagicMock
from ansible import arguments
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.playbook import Playbook
from ansible.template import Templar
@ -31,10 +33,12 @@ from units.mock.loader import DictDataLoader
class TestPlaybookExecutor(unittest.TestCase):
def setUp(self):
pass
# Reset command line args for every test
arguments.CLIArgs._Singleton__instance = None
def tearDown(self):
pass
# And cleanup after ourselves too
arguments.CLIArgs._Singleton__instance = None
def test_get_serialized_batches(self):
fake_loader = DictDataLoader({
@ -77,11 +81,6 @@ class TestPlaybookExecutor(unittest.TestCase):
mock_inventory = MagicMock()
mock_var_manager = MagicMock()
# fake out options to use the syntax CLI switch, which will ensure
# the PlaybookExecutor doesn't create a TaskQueueManager
mock_options = MagicMock()
mock_options.syntax.value = True
templar = Templar(loader=fake_loader)
pbe = PlaybookExecutor(
@ -89,7 +88,6 @@ class TestPlaybookExecutor(unittest.TestCase):
inventory=mock_inventory,
variable_manager=mock_var_manager,
loader=fake_loader,
options=mock_options,
passwords=[],
)

@ -20,6 +20,9 @@ from __future__ import (absolute_import, division, print_function)
from units.compat import unittest
from units.compat.mock import MagicMock
from ansible import arguments
from ansible import context
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.playbook import Playbook
from ansible.plugins.callback import CallbackBase
@ -32,10 +35,11 @@ class TestTaskQueueManagerCallbacks(unittest.TestCase):
inventory = MagicMock()
variable_manager = MagicMock()
loader = MagicMock()
options = MagicMock()
passwords = []
self._tqm = TaskQueueManager(inventory, variable_manager, loader, options, passwords)
# Reset the stored command line args
arguments.GlobalCLIArgs._Singleton__instance = None
self._tqm = TaskQueueManager(inventory, variable_manager, loader, passwords)
self._playbook = Playbook(loader)
# we use a MagicMock to register the result of the call we
@ -46,7 +50,8 @@ class TestTaskQueueManagerCallbacks(unittest.TestCase):
self._register = MagicMock()
def tearDown(self):
pass
# Reset the stored command line args
arguments.GlobalCLIArgs._Singleton__instance = None
def test_task_queue_manager_callbacks_v2_playbook_on_start(self):
"""

@ -11,8 +11,10 @@ import os
import pytest
from ansible import arguments
from ansible import constants as C
from ansible.cli import CLI
from ansible import context
from ansible import cli
from units.compat import unittest
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.module_utils.six.moves import shlex_quote
@ -23,7 +25,7 @@ from units.mock.loader import DictDataLoader
@pytest.fixture
def parser():
parser = CLI.base_parser(runas_opts=True, meta_opts=True,
parser = cli.base_parser(runas_opts=True, meta_opts=True,
runtask_opts=True, vault_opts=True,
async_opts=True, connect_opts=True,
subset_opts=True, check_opts=True,
@ -31,9 +33,18 @@ def parser():
return parser
def test_play_context(mocker, parser):
@pytest.fixture
def reset_cli_args():
arguments.GlobalCLIArgs._Singleton__instance = None
yield
arguments.GlobalCLIArgs._Singleton__instance = None
def test_play_context(mocker, parser, reset_cli_args):
(options, args) = parser.parse_args(['-vv', '--check'])
play_context = PlayContext(options=options)
options.args = args
context._init_global_context(options)
play_context = PlayContext()
assert play_context._attributes['connection'] == C.DEFAULT_TRANSPORT
assert play_context.remote_addr is None
@ -56,7 +67,7 @@ def test_play_context(mocker, parser):
mock_play.become_user = 'mockroot'
mock_play.no_log = True
play_context = PlayContext(play=mock_play, options=options)
play_context = PlayContext(play=mock_play)
assert play_context.connection == 'mock'
assert play_context.remote_user == 'mock'
assert play_context.password == ''
@ -83,7 +94,7 @@ def test_play_context(mocker, parser):
mock_templar = mocker.MagicMock()
play_context = PlayContext(play=mock_play, options=options)
play_context = PlayContext(play=mock_play)
play_context = play_context.set_task_and_variable_override(task=mock_task, variables=all_vars, templar=mock_templar)
assert play_context.connection == 'mock_inventory'
@ -100,9 +111,11 @@ def test_play_context(mocker, parser):
assert play_context.no_log is False
def test_play_context_make_become_cmd(parser):
def test_play_context_make_become_cmd(mocker, parser, reset_cli_args):
(options, args) = parser.parse_args([])
play_context = PlayContext(options=options)
options.args = args
context._init_global_context(options)
play_context = PlayContext()
default_cmd = "/bin/foo"
default_exe = "/bin/bash"

@ -66,7 +66,6 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm = MagicMock(TaskQueueManager)
mock_tqm._final_q = mock_queue
mock_tqm._options = MagicMock()
strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base.cleanup()
@ -106,7 +105,6 @@ class TestStrategyBase(unittest.TestCase):
mock_tqm._failed_hosts = dict()
mock_tqm._unreachable_hosts = dict()
mock_tqm._options = MagicMock()
strategy_base = StrategyBase(tqm=mock_tqm)
mock_host = MagicMock()
@ -187,15 +185,13 @@ class TestStrategyBase(unittest.TestCase):
mock_host.has_hostkey = True
mock_inventory = MagicMock()
mock_inventory.get.return_value = mock_host
mock_options = MagicMock()
mock_options.module_path = None
tqm = TaskQueueManager(
inventory=mock_inventory,
variable_manager=mock_var_manager,
loader=fake_loader,
options=mock_options,
passwords=None,
forks=5,
)
tqm._initialize_processes(3)
tqm.hostvars = dict()
@ -520,15 +516,13 @@ class TestStrategyBase(unittest.TestCase):
mock_iterator._play = mock_play
fake_loader = DictDataLoader()
mock_options = MagicMock()
mock_options.module_path = None
tqm = TaskQueueManager(
inventory=mock_inventory,
variable_manager=mock_var_mgr,
loader=fake_loader,
options=mock_options,
passwords=None,
forks=5,
)
tqm._initialize_processes(3)
tqm._initialize_notified_handlers(mock_play)

@ -80,15 +80,12 @@ class TestStrategyLinear(unittest.TestCase):
all_vars=dict(),
)
mock_options = MagicMock()
mock_options.module_path = None
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=mock_var_manager,
loader=fake_loader,
options=mock_options,
passwords=None,
forks=5,
)
tqm._initialize_processes(3)
strategy = StrategyModule(tqm)

@ -35,6 +35,17 @@ def test_make_immutable(data, expected):
assert arguments._make_immutable(data) == expected
def test_cliargs_from_dict():
old_dict = {'tags': [u'production', u'webservers'],
'check_mode': True,
'start_at_task': u'Start with くらとみ'}
expected = frozenset((('tags', (u'production', u'webservers')),
('check_mode', True),
('start_at_task', u'Start with くらとみ')))
assert frozenset(arguments.CLIArgs(old_dict).items()) == expected
def test_cliargs():
class FakeOptions:
pass
@ -47,7 +58,7 @@ def test_cliargs():
('check_mode', True),
('start_at_task', u'Start with くらとみ')))
assert frozenset(arguments.CLIArgs(options).items()) == expected
assert frozenset(arguments.CLIArgs.from_options(options).items()) == expected
@pytest.mark.skipIf(argparse is None)

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Toshio Kuratomi <tkuratomi@ansible.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import pytest
from ansible import context
class FakeOptions:
pass
def test_set_global_context():
options = FakeOptions()
options.tags = [u'production', u'webservers']
options.check_mode = True
options.start_at_task = u'Start with くらとみ'
expected = frozenset((('tags', (u'production', u'webservers')),
('check_mode', True),
('start_at_task', u'Start with くらとみ')))
context._init_global_context(options)
assert frozenset(context.CLIARGS.items()) == expected
Loading…
Cancel
Save