Migrate command line parsing to argparse (#50610)

* Start of migration to argparse

* various fixes and improvements

* Linting fixes

* Test fixes

* Fix vault_password_files

* Add PrependAction for argparse

* A bunch of additional tweak/fixes

* Fix ansible-config tests

* Fix man page generation

* linting fix

* More adhoc pattern fixes

* Add changelog fragment

* Add support for argcomplete

* Enable argcomplete global completion

* Rename PrependAction to PrependListAction to better describe what it does

* Add documentation for installing and configuring argcomplete

* Address rebase issues

* Fix display encoding for vault

* Fix line length

* Address rebase issues

* Handle rebase issues

* Use mutually exclusive group instead of handling manually

* Fix rebase issues

* Address rebase issue

* Update version added for argcomplete support

* -e must be given a value

* ci_complete
pull/55674/head
Matt Martz 6 years ago committed by GitHub
parent 7ee6c136fd
commit db6cc60352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,5 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
@ -17,7 +17,8 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
########################################################
# PYTHON_ARGCOMPLETE_OK
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

@ -0,0 +1,2 @@
minor_changes:
- Command line argument parsing - Switch from deprecated optparse to argparse

@ -1,6 +1,6 @@
#!/usr/bin/env python
import optparse
import argparse
import os
import sys
@ -11,15 +11,14 @@ from ansible.utils._build_helpers import update_file_if_different
def generate_parser():
p = optparse.OptionParser(
version='%prog 1.0',
usage='usage: %prog [options]',
p = argparse.ArgumentParser(
description='Generate cli documentation from cli docstrings',
)
p.add_option("-t", "--template-file", action="store", dest="template_file", default="../templates/man.j2", help="path to jinja2 template")
p.add_option("-o", "--output-dir", action="store", dest="output_dir", default='/tmp/', help="Output directory for rst files")
p.add_option("-f", "--output-format", action="store", dest="output_format", default='man', help="Output format for docs (the default 'man' or 'rst')")
p.add_argument("-t", "--template-file", action="store", dest="template_file", default="../templates/man.j2", help="path to jinja2 template")
p.add_argument("-o", "--output-dir", action="store", dest="output_dir", default='/tmp/', help="Output directory for rst files")
p.add_argument("-f", "--output-format", action="store", dest="output_format", default='man', help="Output format for docs (the default 'man' or 'rst')")
p.add_argument('args', help='CLI module(s)', metavar='module', nargs='*')
return p
@ -57,34 +56,49 @@ def get_options(optlist):
for opt in optlist:
res = {
'desc': opt.help,
'options': opt._short_opts + opt._long_opts
'options': opt.option_strings
}
if opt.action == 'store':
if isinstance(opt, argparse._StoreAction):
res['arg'] = opt.dest.upper()
elif not res['options']:
continue
opts.append(res)
return opts
def dedupe_groups(parser):
action_groups = []
for action_group in parser._action_groups:
found = False
for a in action_groups:
if a._actions == action_group._actions:
found = True
break
if not found:
action_groups.append(action_group)
return action_groups
def get_option_groups(option_parser):
groups = []
for option_group in option_parser.option_groups:
for action_group in dedupe_groups(option_parser)[1:]:
group_info = {}
group_info['desc'] = option_group.get_description()
group_info['options'] = option_group.option_list
group_info['group_obj'] = option_group
group_info['desc'] = action_group.description
group_info['options'] = action_group._actions
group_info['group_obj'] = action_group
groups.append(group_info)
return groups
def opt_doc_list(cli):
def opt_doc_list(parser):
''' iterate over options lists '''
results = []
for option_group in cli.parser.option_groups:
results.extend(get_options(option_group.option_list))
for option_group in dedupe_groups(parser)[1:]:
results.extend(get_options(option_group._actions))
results.extend(get_options(cli.parser.option_list))
results.extend(get_options(parser._actions))
return results
@ -106,15 +120,17 @@ def opts_docs(cli_class_name, cli_module_name):
# parse the common options
try:
cli.parse()
cli.init_parser()
except Exception:
pass
cli.parser.prog = cli_name
# base/common cli info
docs = {
'cli': cli_module_name,
'cli_name': cli_name,
'usage': cli.parser.usage,
'usage': cli.parser.format_usage(),
'short_desc': cli.parser.description,
'long_desc': trim_docstring(cli.__doc__),
'actions': {},
@ -127,7 +143,7 @@ def opts_docs(cli_class_name, cli_module_name):
if hasattr(cli, extras):
docs[extras.lower()] = getattr(cli, extras)
common_opts = opt_doc_list(cli)
common_opts = opt_doc_list(cli.parser)
groups_info = get_option_groups(cli.parser)
shared_opt_names = []
for opt in common_opts:
@ -144,25 +160,11 @@ def opts_docs(cli_class_name, cli_module_name):
# force populate parser with per action options
# use class attrs not the attrs on a instance (not that it matters here...)
for action in getattr(cli_klass, 'VALID_ACTIONS', ()):
# instantiate each cli and ask its options
action_cli_klass = getattr(__import__("ansible.cli.%s" % cli_module_name,
fromlist=[cli_class_name]), cli_class_name)
# init with args with action added?
cli = action_cli_klass([])
cli.args.append(action)
try:
cli.parse()
except Exception:
pass
# FIXME/TODO: needed?
# avoid dupe errors
cli.parser.set_conflict_handler('resolve')
cli.set_action()
try:
subparser = cli.parser._subparsers._group_actions[0].choices
except AttributeError:
subparser = {}
for action, parser in subparser.items():
action_info = {'option_names': [],
'options': []}
# docs['actions'][action] = {}
@ -171,7 +173,7 @@ def opts_docs(cli_class_name, cli_module_name):
action_info['desc'] = trim_docstring(getattr(cli, 'execute_%s' % action).__doc__)
# docs['actions'][action]['desc'] = getattr(cli, 'execute_%s' % action).__doc__.strip()
action_doc_list = opt_doc_list(cli)
action_doc_list = opt_doc_list(parser)
uncommon_options = []
for action_doc in action_doc_list:
@ -196,7 +198,7 @@ def opts_docs(cli_class_name, cli_module_name):
docs['actions'][action] = action_info
docs['options'] = opt_doc_list(cli)
docs['options'] = opt_doc_list(cli.parser)
return docs
@ -204,7 +206,7 @@ if __name__ == '__main__':
parser = generate_parser()
options, args = parser.parse_args()
options = parser.parse_args()
template_file = options.template_file
template_path = os.path.expanduser(template_file)
@ -214,7 +216,7 @@ if __name__ == '__main__':
output_dir = os.path.abspath(options.output_dir)
output_format = options.output_format
cli_modules = args
cli_modules = options.args
# various cli parsing things checks sys.argv if the 'args' that are passed in are []
# so just remove any args so the cli modules dont try to parse them resulting in warnings

@ -429,6 +429,91 @@ Now let's test things with a ping command:
You can also use "sudo make install".
.. _shell_completion:
Shell Completion
````````````````
As of Ansible 2.9 shell completion of the ansible command line utilities is available and provided through an optional dependency
called ``argcomplete``. ``argcomplete`` supports bash, and limited support for zsh and tcsh
``python-argcomplete`` can be installed from EPEL on Red Hat Enterprise based distributions, and is available in the standard OS repositories for many other distributions.
For more information about installing and configuration see the `argcomplete documentation <https://argcomplete.readthedocs.io/en/latest/>_`.
Installing
++++++++++
via yum/dnf
-----------
On Fedora:
.. code-block:: bash
$ sudo dnf install python-argcomplete
On RHEL and CentOS:
.. code-block:: bash
$ sudo yum install epel-release
$ sudo yum install python-argcomplete
via apt
-------
.. code-block:: bash
$ sudo apt install python-argcomplete
via pip
-------
.. code-block:: bash
$ pip install argcomplete
Configuring
+++++++++++
There are 2 ways to configure argcomplete to allow shell completion of the Ansible command line utilities. Per command, or globally.
Globally
--------
Global completion requires bash 4.2
.. code-block:: bash
$ sudo activate-global-python-argcomplete
This will write a bash completion file to a global location, use ``--dest`` to change the location
Per Command
-----------
If you do not have bash 4.2, you must register each script independently
.. code-block:: bash
$ eval $(register-python-argcomplete ansible)
$ eval $(register-python-argcomplete ansible-config)
$ eval $(register-python-argcomplete ansible-console)
$ eval $(register-python-argcomplete ansible-doc)
$ eval $(register-python-argcomplete ansible-galaxy)
$ eval $(register-python-argcomplete ansible-inventory)
$ eval $(register-python-argcomplete ansible-playbook)
$ eval $(register-python-argcomplete ansible-pull)
$ eval $(register-python-argcomplete ansible-vault)
It would be advisable to place the above commands, into your shells profile file such as ``~/.profile`` or ``~/.bash_profile``.
Zsh or tcsh
-----------
See the `argcomplete documentation <https://argcomplete.readthedocs.io/en/latest/>_`.
.. _getting_ansible:
Ansible on GitHub

@ -7,7 +7,7 @@ Using Vault in playbooks
The "Vault" is a feature of Ansible that allows you to keep sensitive data such as passwords or keys in encrypted files, rather than as plaintext in playbooks or roles. These vault files can then be distributed or placed in source control.
To enable this feature, a command line tool, :ref:`ansible-vault` is used to edit files, and a command line flag :option:`--ask-vault-pass <ansible-vault --ask-vault-pass>`, :option:`--vault-password-file <ansible-vault --vault-password-file>` or :option:`--vault-id <ansible-playbook --vault-id>` is used. You can also modify your ``ansible.cfg`` file to specify the location of a password file or configure Ansible to always prompt for the password. These options require no command line flag usage.
To enable this feature, a command line tool, :ref:`ansible-vault` is used to edit files, and a command line flag :option:`--ask-vault-pass <ansible-vault-create --ask-vault-pass>`, :option:`--vault-password-file <ansible-vault-create --vault-password-file>` or :option:`--vault-id <ansible-playbook --vault-id>` is used. You can also modify your ``ansible.cfg`` file to specify the location of a password file or configure Ansible to always prompt for the password. These options require no command line flag usage.
For best practices advice, refer to :ref:`best_practices_for_variables_and_vaults`.

@ -344,7 +344,7 @@ passwords will be tried in the order they are specified.
In the above case, the 'dev' password will be tried first, then the 'prod' password for cases
where Ansible doesn't know which vault ID is used to encrypt something.
To add a vault ID label to the encrypted data use the :option:`--vault-id <ansible-vault --vault-id>` option
To add a vault ID label to the encrypted data use the :option:`--vault-id <ansible-vault-create --vault-id>` option
with a label when encrypting the data.
The :ref:`DEFAULT_VAULT_ID_MATCH` config option can be set so that Ansible will only use the password with

@ -38,7 +38,7 @@ Common Options
==============
{% for option in options|sort(attribute='options') %}
{% for option in options|sort(attribute='options') if option.options %}
.. option:: {% for switch in option['options'] %}{{switch}}{% if option['arg'] %} <{{option['arg']}}>{% endif %}{% if not loop.last %}, {% endif %}{% endfor %}

@ -17,8 +17,8 @@ from abc import ABCMeta, abstractmethod
from ansible import constants as C
from ansible import context
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.errors import AnsibleOptionsError, AnsibleError
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.six import with_metaclass, string_types
from ansible.module_utils._text import to_bytes, to_text
@ -30,6 +30,12 @@ from ansible.vars.manager import VariableManager
from ansible.parsing.vault import PromptVaultSecret, get_file_vault_secret
from ansible.plugins.loader import add_all_plugin_dirs
try:
import argcomplete
HAS_ARGCOMPLETE = True
except ImportError:
HAS_ARGCOMPLETE = False
display = Display()
@ -37,8 +43,6 @@ display = Display()
class CLI(with_metaclass(ABCMeta, object)):
''' code behind bin/ansible* programs '''
VALID_ACTIONS = frozenset()
_ITALIC = re.compile(r"I\(([^)]+)\)")
_BOLD = re.compile(r"B\(([^)]+)\)")
_MODULE = re.compile(r"M\(([^)]+)\)")
@ -59,38 +63,8 @@ class CLI(with_metaclass(ABCMeta, object)):
self.args = args
self.parser = None
self.action = None
self.callback = callback
def set_action(self):
"""
Get the action the user wants to execute from the sys argv list.
"""
for i in range(0, len(self.args)):
arg = self.args[i]
if arg in self.VALID_ACTIONS:
self.action = arg
del self.args[i]
break
if not self.action:
# if we're asked for help or version, we don't need an action.
# have to use a special purpose Option Parser to figure that out as
# the standard OptionParser throws an error for unknown options and
# without knowing action, we only know of a subset of the options
# that could be legal for this command
tmp_parser = opt_help.InvalidOptsParser(self.parser)
tmp_options, tmp_args = tmp_parser.parse_args(self.args)
if not(hasattr(tmp_options, 'help') and tmp_options.help) or (hasattr(tmp_options, 'version') and tmp_options.version):
raise AnsibleOptionsError("Missing required action")
def execute(self):
"""
Actually runs a child defined method using the execute_<action> pattern
"""
fn = getattr(self, "execute_%s" % self.action)
fn()
@abstractmethod
def run(self):
"""Run the ansible command
@ -100,7 +74,7 @@ class CLI(with_metaclass(ABCMeta, object)):
"""
self.parse()
display.vv(to_text(self.parser.get_version()))
display.vv(to_text(opt_help.version(self.parser.prog)))
if C.CONFIG_FILE:
display.v(u"Using %s as config file" % to_text(C.CONFIG_FILE))
@ -277,18 +251,9 @@ class CLI(with_metaclass(ABCMeta, object)):
return (sshpass, becomepass)
def validate_conflicts(self, op, vault_opts=False, runas_opts=False, fork_opts=False, vault_rekey_opts=False):
def validate_conflicts(self, op, runas_opts=False, fork_opts=False):
''' check for conflicting options '''
if vault_opts:
# Check for vault related conflicts
if op.ask_vault_pass and op.vault_password_files:
self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive")
if vault_rekey_opts:
if op.new_vault_id and op.new_vault_password_file:
self.parser.error("--new-vault-password-file and --new-vault-id are mutually exclusive")
if fork_opts:
if op.forks < 1:
self.parser.error("The number of processes (--forks) must be >= 1")
@ -307,13 +272,13 @@ class CLI(with_metaclass(ABCMeta, object)):
def init_parser(self):
super(MyCLI, self).init_parser(usage="My Ansible CLI", inventory_opts=True)
ansible.arguments.optparse_helpers.add_runas_options(self.parser)
ansible.arguments.option_helpers.add_runas_options(self.parser)
self.parser.add_option('--my-option', dest='my_option', action='store')
"""
self.parser = opt_help.create_base_parser(usage=usage, desc=desc, epilog=epilog)
@abstractmethod
def post_process_args(self, options, args):
def post_process_args(self, options):
"""Process the command line args
Subclasses need to implement this method. This method validates and transforms the command
@ -322,13 +287,13 @@ class CLI(with_metaclass(ABCMeta, object)):
An implementation will look something like this::
def post_process_args(self, options, args):
options, args = super(MyCLI, self).post_process_args(options, args)
def post_process_args(self, options):
options = super(MyCLI, self).post_process_args(options)
if options.addition and options.subtraction:
raise AnsibleOptionsError('Only one of --addition and --subtraction can be specified')
if isinstance(options.listofhosts, string_types):
options.listofhosts = string_types.split(',')
return options, args
return options
"""
# process tags
@ -364,7 +329,7 @@ class CLI(with_metaclass(ABCMeta, object)):
else:
options.inventory = C.DEFAULT_HOST_LIST
return options, args
return options
def parse(self):
"""Parse the command line args
@ -377,9 +342,12 @@ class CLI(with_metaclass(ABCMeta, object)):
are called from this function before and after parsing the arguments.
"""
self.init_parser()
options, args = self.parser.parse_args(self.args[1:])
options, args = self.post_process_args(options, args)
options.args = args
if HAS_ARGCOMPLETE:
argcomplete.autocomplete(self.parser)
options = self.parser.parse_args(self.args[1:])
options = self.post_process_args(options)
context._init_global_context(options)
@staticmethod

@ -8,7 +8,7 @@ __metaclass__ = type
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_text
@ -46,26 +46,22 @@ class AdHocCLI(CLI):
opt_help.add_basedir_options(self.parser)
# options unique to ansible ad-hoc
self.parser.add_option('-a', '--args', dest='module_args',
help="module arguments", default=C.DEFAULT_MODULE_ARGS)
self.parser.add_option('-m', '--module-name', dest='module_name',
help="module name to execute (default=%s)" % C.DEFAULT_MODULE_NAME,
default=C.DEFAULT_MODULE_NAME)
def post_process_args(self, options, args):
self.parser.add_argument('-a', '--args', dest='module_args',
help="module arguments", default=C.DEFAULT_MODULE_ARGS)
self.parser.add_argument('-m', '--module-name', dest='module_name',
help="module name to execute (default=%s)" % C.DEFAULT_MODULE_NAME,
default=C.DEFAULT_MODULE_NAME)
self.parser.add_argument('args', metavar='pattern', help='host pattern')
def post_process_args(self, options):
'''Post process and validate options for bin/ansible '''
options, args = super(AdHocCLI, self).post_process_args(options, args)
if len(args) < 1:
raise AnsibleOptionsError("Missing target hosts")
elif len(args) > 1:
raise AnsibleOptionsError("Extraneous options or arguments")
options = super(AdHocCLI, self).post_process_args(options)
display.verbosity = options.verbosity
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
self.validate_conflicts(options, runas_opts=True, fork_opts=True)
return options, args
return options
def _play_ds(self, pattern, async_val, poll):
check_raw = context.CLIARGS['module_name'] in ('command', 'win_command', 'shell', 'win_shell', 'script', 'raw')
@ -89,7 +85,7 @@ class AdHocCLI(CLI):
super(AdHocCLI, self).run()
# only thing left should be host pattern
pattern = to_text(context.CLIARGS['args'][0], errors='surrogate_or_strict')
pattern = to_text(context.CLIARGS['args'], errors='surrogate_or_strict')
sshpass = None
becomepass = None

@ -0,0 +1,350 @@
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import copy
import operator
import argparse
import os
import os.path
import sys
import time
import yaml
import ansible
from ansible import constants as C
from ansible.module_utils._text import to_native
from ansible.release import __version__
from ansible.utils.path import unfrackpath
#
# Special purpose OptionParsers
#
class SortingHelpFormatter(argparse.HelpFormatter):
def add_arguments(self, actions):
actions = sorted(actions, key=operator.attrgetter('option_strings'))
super(SortingHelpFormatter, self).add_arguments(actions)
class PrependListAction(argparse.Action):
"""A near clone of ``argparse._AppendAction``, but designed to prepend list values
instead of appending.
"""
def __init__(self, option_strings, dest, nargs=None, const=None, default=None, type=None,
choices=None, required=False, help=None, metavar=None):
if nargs == 0:
raise ValueError('nargs for append actions must be > 0; if arg '
'strings are not supplying the value to append, '
'the append const action may be more appropriate')
if const is not None and nargs != argparse.OPTIONAL:
raise ValueError('nargs must be %r to supply const' % argparse.OPTIONAL)
super(PrependListAction, self).__init__(
option_strings=option_strings,
dest=dest,
nargs=nargs,
const=const,
default=default,
type=type,
choices=choices,
required=required,
help=help,
metavar=metavar
)
def __call__(self, parser, namespace, values, option_string=None):
items = copy.copy(ensure_value(namespace, self.dest, []))
items[0:0] = values
setattr(namespace, self.dest, items)
def ensure_value(namespace, name, value):
if getattr(namespace, name, None) is None:
setattr(namespace, name, value)
return getattr(namespace, name)
#
# Callbacks to validate and normalize Options
#
def unfrack_path(pathsep=False):
"""Turn an Option's data into a single path in Ansible locations"""
def inner(value):
if pathsep:
return [unfrackpath(x) for x in value.split(os.pathsep) if x]
if value == '-':
return value
return unfrackpath(value)
return inner
def _git_repo_info(repo_path):
""" returns a string containing git branch, commit id and commit date """
result = None
if os.path.exists(repo_path):
# Check if the .git is a file. If it is a file, it means that we are in a submodule structure.
if os.path.isfile(repo_path):
try:
gitdir = yaml.safe_load(open(repo_path)).get('gitdir')
# There is a possibility the .git file to have an absolute path.
if os.path.isabs(gitdir):
repo_path = gitdir
else:
repo_path = os.path.join(repo_path[:-4], gitdir)
except (IOError, AttributeError):
return ''
with open(os.path.join(repo_path, "HEAD")) as f:
line = f.readline().rstrip("\n")
if line.startswith("ref:"):
branch_path = os.path.join(repo_path, line[5:])
else:
branch_path = None
if branch_path and os.path.exists(branch_path):
branch = '/'.join(line.split('/')[2:])
with open(branch_path) as f:
commit = f.readline()[:10]
else:
# detached HEAD
commit = line[:10]
branch = 'detached HEAD'
branch_path = os.path.join(repo_path, "HEAD")
date = time.localtime(os.stat(branch_path).st_mtime)
if time.daylight == 0:
offset = time.timezone
else:
offset = time.altzone
result = "({0} {1}) last updated {2} (GMT {3:+04d})".format(branch, commit, time.strftime("%Y/%m/%d %H:%M:%S", date), int(offset / -36))
else:
result = ''
return result
def _gitinfo():
basedir = os.path.join(os.path.dirname(__file__), '..', '..', '..')
repo_path = os.path.join(basedir, '.git')
result = _git_repo_info(repo_path)
submodules = os.path.join(basedir, '.gitmodules')
if not os.path.exists(submodules):
return result
with open(submodules) as f:
for line in f:
tokens = line.strip().split(' ')
if tokens[0] == 'path':
submodule_path = tokens[2]
submodule_info = _git_repo_info(os.path.join(basedir, submodule_path, '.git'))
if not submodule_info:
submodule_info = ' not found - use git submodule update --init ' + submodule_path
result += "\n {0}: {1}".format(submodule_path, submodule_info)
return result
def version(prog=None):
""" return ansible version """
if prog:
result = " ".join((prog, __version__))
else:
result = __version__
gitinfo = _gitinfo()
if gitinfo:
result = result + " {0}".format(gitinfo)
result += "\n config file = %s" % C.CONFIG_FILE
if C.DEFAULT_MODULE_PATH is None:
cpath = "Default w/o overrides"
else:
cpath = C.DEFAULT_MODULE_PATH
result = result + "\n configured module search path = %s" % cpath
result = result + "\n ansible python module location = %s" % ':'.join(ansible.__path__)
result = result + "\n executable location = %s" % sys.argv[0]
result = result + "\n python version = %s" % ''.join(sys.version.splitlines())
return result
#
# Functions to add pre-canned options to an OptionParser
#
def create_base_parser(usage="", desc=None, epilog=None):
"""
Create an options parser for all ansible scripts
"""
# base opts
parser = argparse.ArgumentParser(
formatter_class=SortingHelpFormatter,
epilog=epilog,
description=desc,
conflict_handler='resolve',
)
version_help = "show program's version number, config file location, configured module search path," \
" module location, executable location and exit"
parser.add_argument('--version', action='version', version=to_native(version("%(prog)s")), help=version_help)
add_verbosity_options(parser)
return parser
def add_verbosity_options(parser):
"""Add options for verbosity"""
parser.add_argument('-v', '--verbose', dest='verbosity', default=C.DEFAULT_VERBOSITY, action="count",
help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
def add_async_options(parser):
"""Add options for commands which can launch async tasks"""
parser.add_argument('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type=int, dest='poll_interval',
help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL)
parser.add_argument('-B', '--background', dest='seconds', type=int, default=0,
help='run asynchronously, failing after X seconds (default=N/A)')
def add_basedir_options(parser):
"""Add options for commands which can set a playbook basedir"""
parser.add_argument('--playbook-dir', default=None, dest='basedir', action='store',
help="Since this tool does not use playbooks, use this as a substitute playbook directory."
"This sets the relative path for many features including roles/ group_vars/ etc.")
def add_check_options(parser):
"""Add options for commands which can run with diagnostic information of tasks"""
parser.add_argument("-C", "--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
parser.add_argument('--syntax-check', dest='syntax', action='store_true',
help="perform a syntax check on the playbook, but do not execute it")
parser.add_argument("-D", "--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those"
" files; works great with --check")
def add_connect_options(parser):
"""Add options for commands which need to connection to other hosts"""
connect_group = parser.add_argument_group("Connection Options", "control as whom and how to connect to hosts")
connect_group.add_argument('-k', '--ask-pass', default=C.DEFAULT_ASK_PASS, dest='ask_pass', action='store_true',
help='ask for connection password')
connect_group.add_argument('--private-key', '--key-file', default=C.DEFAULT_PRIVATE_KEY_FILE, dest='private_key_file',
help='use this file to authenticate the connection', type=unfrack_path())
connect_group.add_argument('-u', '--user', default=C.DEFAULT_REMOTE_USER, dest='remote_user',
help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER)
connect_group.add_argument('-c', '--connection', dest='connection', default=C.DEFAULT_TRANSPORT,
help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT)
connect_group.add_argument('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type=int, dest='timeout',
help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT)
connect_group.add_argument('--ssh-common-args', default='', dest='ssh_common_args',
help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)")
connect_group.add_argument('--sftp-extra-args', default='', dest='sftp_extra_args',
help="specify extra arguments to pass to sftp only (e.g. -f, -l)")
connect_group.add_argument('--scp-extra-args', default='', dest='scp_extra_args',
help="specify extra arguments to pass to scp only (e.g. -l)")
connect_group.add_argument('--ssh-extra-args', default='', dest='ssh_extra_args',
help="specify extra arguments to pass to ssh only (e.g. -R)")
parser.add_argument_group(connect_group)
def add_fork_options(parser):
"""Add options for commands that can fork worker processes"""
parser.add_argument('-f', '--forks', dest='forks', default=C.DEFAULT_FORKS, type=int,
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
def add_inventory_options(parser):
"""Add options for commands that utilize inventory"""
parser.add_argument('-i', '--inventory', '--inventory-file', dest='inventory', action="append",
help="specify inventory host path or comma separated host list. --inventory-file is deprecated")
parser.add_argument('--list-hosts', dest='listhosts', action='store_true',
help='outputs a list of matching hosts; does not execute anything else')
parser.add_argument('-l', '--limit', default=C.DEFAULT_SUBSET, dest='subset',
help='further limit selected hosts to an additional pattern')
def add_meta_options(parser):
"""Add options for commands which can launch meta tasks from the command line"""
parser.add_argument('--force-handlers', default=C.DEFAULT_FORCE_HANDLERS, dest='force_handlers', action='store_true',
help="run handlers even if a task fails")
parser.add_argument('--flush-cache', dest='flush_cache', action='store_true',
help="clear the fact cache for every host in inventory")
def add_module_options(parser):
"""Add options for commands that load modules"""
parser.add_argument('-M', '--module-path', dest='module_path', default=None,
help="prepend colon-separated path(s) to module library (default=%s)" % C.DEFAULT_MODULE_PATH,
type=unfrack_path(pathsep=True), action=PrependListAction)
def add_output_options(parser):
"""Add options for commands which can change their output"""
parser.add_argument('-o', '--one-line', dest='one_line', action='store_true',
help='condense output')
parser.add_argument('-t', '--tree', dest='tree', default=None,
help='log output to this directory')
def add_runas_options(parser):
"""
Add options for commands which can run tasks as another user
Note that this includes the options from add_runas_prompt_options(). Only one of these
functions should be used.
"""
runas_group = parser.add_argument_group("Privilege Escalation Options", "control how and which user you become as on target hosts")
# consolidated privilege escalation (become)
runas_group.add_argument("-b", "--become", default=C.DEFAULT_BECOME, action="store_true", dest='become',
help="run operations with become (does not imply password prompting)")
runas_group.add_argument('--become-method', dest='become_method', default=C.DEFAULT_BECOME_METHOD, choices=C.BECOME_METHODS,
help="privilege escalation method to use (default=%(default)s), use "
"`ansible-doc -t become -l` to list valid choices.")
runas_group.add_argument('--become-user', default=None, dest='become_user', type=str,
help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
add_runas_prompt_options(parser, runas_group=runas_group)
def add_runas_prompt_options(parser, runas_group=None):
"""
Add options for commands which need to prompt for privilege escalation credentials
Note that add_runas_options() includes these options already. Only one of the two functions
should be used.
"""
if runas_group is None:
runas_group = parser.add_argument_group("Privilege Escalation Options",
"control how and which user you become as on target hosts")
runas_group.add_argument('-K', '--ask-become-pass', dest='become_ask_pass', action='store_true',
default=C.DEFAULT_BECOME_ASK_PASS,
help='ask for privilege escalation password')
parser.add_argument_group(runas_group)
def add_runtask_options(parser):
"""Add options for commands that run a task"""
parser.add_argument('-e', '--extra-vars', dest="extra_vars", action="append",
help="set additional variables as key=value or YAML/JSON, if filename prepend with @", default=[])
def add_subset_options(parser):
"""Add options for commands which can run a subset of tasks"""
parser.add_argument('-t', '--tags', dest='tags', default=C.TAGS_RUN, action='append',
help="only run plays and tasks tagged with these values")
parser.add_argument('--skip-tags', dest='skip_tags', default=C.TAGS_SKIP, action='append',
help="only run plays and tasks whose tags do not match these values")
def add_vault_options(parser):
"""Add options for loading vault files"""
parser.add_argument('--vault-id', default=[], dest='vault_ids', action='append', type=str,
help='the vault identity to use')
base_group = parser.add_mutually_exclusive_group()
base_group.add_argument('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
base_group.add_argument('--vault-password-file', default=[], dest='vault_password_files',
help="vault password file", type=unfrack_path(), action='append')

@ -1,366 +0,0 @@
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import operator
import optparse
import os
import os.path
import sys
import time
import yaml
import ansible
from ansible import constants as C
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_native
from ansible.release import __version__
from ansible.utils.path import unfrackpath
#
# Special purpose OptionParsers
#
class SortedOptParser(optparse.OptionParser):
"""Optparser which sorts the options by opt before outputting --help"""
def format_help(self, formatter=None, epilog=None):
self.option_list.sort(key=operator.methodcaller('get_opt_string'))
return optparse.OptionParser.format_help(self, formatter=None)
# Note: Inherit from SortedOptParser so that we get our format_help method
class InvalidOptsParser(SortedOptParser):
"""Ignore invalid options.
Meant for the special case where we need to take care of help and version but may not know the
full range of options yet.
.. seealso::
See it in use in ansible.cli.CLI.set_action
"""
def __init__(self, parser):
# Since this is special purposed to just handle help and version, we
# take a pre-existing option parser here and set our options from
# that. This allows us to give accurate help based on the given
# option parser.
SortedOptParser.__init__(self, usage=parser.usage,
option_list=parser.option_list,
option_class=parser.option_class,
conflict_handler=parser.conflict_handler,
description=parser.description,
formatter=parser.formatter,
add_help_option=False,
prog=parser.prog,
epilog=parser.epilog)
self.version = parser.version
def _process_long_opt(self, rargs, values):
try:
optparse.OptionParser._process_long_opt(self, rargs, values)
except optparse.BadOptionError:
pass
def _process_short_opts(self, rargs, values):
try:
optparse.OptionParser._process_short_opts(self, rargs, values)
except optparse.BadOptionError:
pass
#
# Callbacks to validate and normalize Options
#
def unfrack_paths(option, opt, value, parser):
"""Turn an Option's value into a list of paths in Ansible locations"""
paths = getattr(parser.values, option.dest)
if paths is None:
paths = []
if isinstance(value, string_types):
paths[:0] = [unfrackpath(x) for x in value.split(os.pathsep) if x]
elif isinstance(value, list):
paths[:0] = [unfrackpath(x) for x in value if x]
else:
pass # FIXME: should we raise options error?
setattr(parser.values, option.dest, paths)
def unfrack_path(option, opt, value, parser):
"""Turn an Option's data into a single path in Ansible locations"""
if value != '-':
setattr(parser.values, option.dest, unfrackpath(value))
else:
setattr(parser.values, option.dest, value)
def _git_repo_info(repo_path):
""" returns a string containing git branch, commit id and commit date """
result = None
if os.path.exists(repo_path):
# Check if the .git is a file. If it is a file, it means that we are in a submodule structure.
if os.path.isfile(repo_path):
try:
gitdir = yaml.safe_load(open(repo_path)).get('gitdir')
# There is a possibility the .git file to have an absolute path.
if os.path.isabs(gitdir):
repo_path = gitdir
else:
repo_path = os.path.join(repo_path[:-4], gitdir)
except (IOError, AttributeError):
return ''
with open(os.path.join(repo_path, "HEAD")) as f:
line = f.readline().rstrip("\n")
if line.startswith("ref:"):
branch_path = os.path.join(repo_path, line[5:])
else:
branch_path = None
if branch_path and os.path.exists(branch_path):
branch = '/'.join(line.split('/')[2:])
with open(branch_path) as f:
commit = f.readline()[:10]
else:
# detached HEAD
commit = line[:10]
branch = 'detached HEAD'
branch_path = os.path.join(repo_path, "HEAD")
date = time.localtime(os.stat(branch_path).st_mtime)
if time.daylight == 0:
offset = time.timezone
else:
offset = time.altzone
result = "({0} {1}) last updated {2} (GMT {3:+04d})".format(branch, commit, time.strftime("%Y/%m/%d %H:%M:%S", date), int(offset / -36))
else:
result = ''
return result
def _gitinfo():
basedir = os.path.join(os.path.dirname(__file__), '..', '..', '..')
repo_path = os.path.join(basedir, '.git')
result = _git_repo_info(repo_path)
submodules = os.path.join(basedir, '.gitmodules')
if not os.path.exists(submodules):
return result
with open(submodules) as f:
for line in f:
tokens = line.strip().split(' ')
if tokens[0] == 'path':
submodule_path = tokens[2]
submodule_info = _git_repo_info(os.path.join(basedir, submodule_path, '.git'))
if not submodule_info:
submodule_info = ' not found - use git submodule update --init ' + submodule_path
result += "\n {0}: {1}".format(submodule_path, submodule_info)
return result
def version(prog=None):
""" return ansible version """
if prog:
result = " ".join((prog, __version__))
else:
result = __version__
gitinfo = _gitinfo()
if gitinfo:
result = result + " {0}".format(gitinfo)
result += "\n config file = %s" % C.CONFIG_FILE
if C.DEFAULT_MODULE_PATH is None:
cpath = "Default w/o overrides"
else:
cpath = C.DEFAULT_MODULE_PATH
result = result + "\n configured module search path = %s" % cpath
result = result + "\n ansible python module location = %s" % ':'.join(ansible.__path__)
result = result + "\n executable location = %s" % sys.argv[0]
result = result + "\n python version = %s" % ''.join(sys.version.splitlines())
return result
#
# Functions to add pre-canned options to an OptionParser
#
def create_base_parser(usage="", desc=None, epilog=None):
"""
Create an options parser for all ansible scripts
"""
# base opts
parser = SortedOptParser(usage, version=to_native(version("%prog")), description=desc, epilog=epilog)
parser.remove_option('--version')
version_help = "show program's version number, config file location, configured module search path," \
" module location, executable location and exit"
parser.add_option('--version', action="version", help=version_help)
parser.add_option('-v', '--verbose', dest='verbosity', default=C.DEFAULT_VERBOSITY, action="count",
help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
return parser
def add_async_options(parser):
"""Add options for commands which can launch async tasks"""
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', dest='poll_interval',
help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL)
parser.add_option('-B', '--background', dest='seconds', type='int', default=0,
help='run asynchronously, failing after X seconds (default=N/A)')
def add_basedir_options(parser):
"""Add options for commands which can set a playbook basedir"""
parser.add_option('--playbook-dir', default=None, dest='basedir', action='store',
help="Since this tool does not use playbooks, use this as a substitute playbook directory."
"This sets the relative path for many features including roles/ group_vars/ etc.")
def add_check_options(parser):
"""Add options for commands which can run with diagnostic information of tasks"""
parser.add_option("-C", "--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
parser.add_option('--syntax-check', dest='syntax', action='store_true',
help="perform a syntax check on the playbook, but do not execute it")
parser.add_option("-D", "--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those"
" files; works great with --check")
def add_connect_options(parser):
"""Add options for commands which need to connection to other hosts"""
connect_group = optparse.OptionGroup(parser, "Connection Options", "control as whom and how to connect to hosts")
connect_group.add_option('-k', '--ask-pass', default=C.DEFAULT_ASK_PASS, dest='ask_pass', action='store_true',
help='ask for connection password')
connect_group.add_option('--private-key', '--key-file', default=C.DEFAULT_PRIVATE_KEY_FILE, dest='private_key_file',
help='use this file to authenticate the connection', action="callback", callback=unfrack_path, type='string')
connect_group.add_option('-u', '--user', default=C.DEFAULT_REMOTE_USER, dest='remote_user',
help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER)
connect_group.add_option('-c', '--connection', dest='connection', default=C.DEFAULT_TRANSPORT,
help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT)
connect_group.add_option('-T', '--timeout', default=C.DEFAULT_TIMEOUT, type='int', dest='timeout',
help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT)
connect_group.add_option('--ssh-common-args', default='', dest='ssh_common_args',
help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)")
connect_group.add_option('--sftp-extra-args', default='', dest='sftp_extra_args',
help="specify extra arguments to pass to sftp only (e.g. -f, -l)")
connect_group.add_option('--scp-extra-args', default='', dest='scp_extra_args',
help="specify extra arguments to pass to scp only (e.g. -l)")
connect_group.add_option('--ssh-extra-args', default='', dest='ssh_extra_args',
help="specify extra arguments to pass to ssh only (e.g. -R)")
parser.add_option_group(connect_group)
def add_fork_options(parser):
"""Add options for commands that can fork worker processes"""
parser.add_option('-f', '--forks', dest='forks', default=C.DEFAULT_FORKS, type='int',
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
def add_inventory_options(parser):
"""Add options for commands that utilize inventory"""
parser.add_option('-i', '--inventory', '--inventory-file', dest='inventory', action="append",
help="specify inventory host path or comma separated host list. --inventory-file is deprecated")
parser.add_option('--list-hosts', dest='listhosts', action='store_true',
help='outputs a list of matching hosts; does not execute anything else')
parser.add_option('-l', '--limit', default=C.DEFAULT_SUBSET, dest='subset',
help='further limit selected hosts to an additional pattern')
def add_meta_options(parser):
"""Add options for commands which can launch meta tasks from the command line"""
parser.add_option('--force-handlers', default=C.DEFAULT_FORCE_HANDLERS, dest='force_handlers', action='store_true',
help="run handlers even if a task fails")
parser.add_option('--flush-cache', dest='flush_cache', action='store_true',
help="clear the fact cache for every host in inventory")
def add_module_options(parser):
"""Add options for commands that load modules"""
module_path = C.config.get_configuration_definition('DEFAULT_MODULE_PATH').get('default', '')
parser.add_option('-M', '--module-path', dest='module_path', default=None,
help="prepend colon-separated path(s) to module library (default=%s)" % module_path,
action="callback", callback=unfrack_paths, type='str')
def add_output_options(parser):
"""Add options for commands which can change their output"""
parser.add_option('-o', '--one-line', dest='one_line', action='store_true',
help='condense output')
parser.add_option('-t', '--tree', dest='tree', default=None,
help='log output to this directory')
def add_runas_options(parser):
"""
Add options for commands which can run tasks as another user
Note that this includes the options from add_runas_prompt_options(). Only one of these
functions should be used.
"""
runas_group = optparse.OptionGroup(parser, "Privilege Escalation Options", "control how and which user you become as on target hosts")
# consolidated privilege escalation (become)
runas_group.add_option("-b", "--become", default=C.DEFAULT_BECOME, action="store_true", dest='become',
help="run operations with become (does not imply password prompting)")
runas_group.add_option('--become-method', dest='become_method', default=C.DEFAULT_BECOME_METHOD,
help="privilege escalation method to use (default=%default), use "
"`ansible-doc -t become -l` to list valid choices.")
runas_group.add_option('--become-user', default=None, dest='become_user', type='string',
help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
add_runas_prompt_options(parser, runas_group=runas_group)
def add_runas_prompt_options(parser, runas_group=None):
"""
Add options for commands which need to prompt for privilege escalation credentials
Note that add_runas_options() includes these options already. Only one of the two functions
should be used.
"""
if runas_group is None:
runas_group = optparse.OptionGroup(parser, "Privilege Escalation Options",
"control how and which user you become as on target hosts")
runas_group.add_option('-K', '--ask-become-pass', dest='become_ask_pass', action='store_true',
help='ask for privilege escalation password', default=C.DEFAULT_BECOME_ASK_PASS)
parser.add_option_group(runas_group)
def add_runtask_options(parser):
"""Add options for commands that run a task"""
parser.add_option('-e', '--extra-vars', dest="extra_vars", action="append",
help="set additional variables as key=value or YAML/JSON, if filename prepend with @", default=[])
def add_subset_options(parser):
"""Add options for commands which can run a subset of tasks"""
parser.add_option('-t', '--tags', dest='tags', default=C.TAGS_RUN, action='append',
help="only run plays and tasks tagged with these values")
parser.add_option('--skip-tags', dest='skip_tags', default=C.TAGS_SKIP, action='append',
help="only run plays and tasks whose tags do not match these values")
def add_vault_options(parser):
"""Add options for loading vault files"""
parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true',
help='ask for vault password')
parser.add_option('--vault-password-file', default=[], dest='vault_password_files',
help="vault password file", action="callback", callback=unfrack_paths, type='string')
parser.add_option('--vault-id', default=[], dest='vault_ids', action='append', type='string',
help='the vault identity to use')
def add_vault_rekey_options(parser):
"""Add options for commands which can edit/rekey a vault file"""
parser.add_option('--new-vault-password-file', default=None, dest='new_vault_password_file',
help="new vault password file for rekey", action="callback", callback=unfrack_path, type='string')
parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string',
help='the new vault identity to use for rekey')

@ -7,11 +7,11 @@ __metaclass__ = type
import os
import shlex
import subprocess
import sys
import yaml
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import option_helpers as opt_help
from ansible.config.manager import ConfigManager, Setting, find_ini_config_file
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils._text import to_native, to_text
@ -26,8 +26,6 @@ display = Display()
class ConfigCLI(CLI):
""" Config command line class """
VALID_ACTIONS = frozenset(("view", "dump", "list")) # TODO: edit, update, search
def __init__(self, args, callback=None):
self.config_file = None
@ -37,35 +35,43 @@ class ConfigCLI(CLI):
def init_parser(self):
super(ConfigCLI, self).init_parser(
usage="usage: %%prog [%s] [--help] [options] [ansible.cfg]" % "|".join(sorted(self.VALID_ACTIONS)),
epilog="\nSee '%s <command> --help' for more information on a specific command.\n\n" % os.path.basename(sys.argv[0]),
desc="View, edit, and manage ansible configuration.",
)
self.parser.add_option('-c', '--config', dest='config_file',
help="path to configuration file, defaults to first file found in precedence.")
self.set_action()
common = opt_help.argparse.ArgumentParser(add_help=False)
opt_help.add_verbosity_options(common)
common.add_argument('-c', '--config', dest='config_file',
help="path to configuration file, defaults to first file found in precedence.")
subparsers = self.parser.add_subparsers(dest='action')
subparsers.required = True
list_parser = subparsers.add_parser('list', help='Print all config options', parents=[common])
list_parser.set_defaults(func=self.execute_list)
# options specific to self.actions
if self.action == "list":
self.parser.set_usage("usage: %prog list [options] ")
dump_parser = subparsers.add_parser('dump', help='Dump configuration', parents=[common])
dump_parser.set_defaults(func=self.execute_dump)
dump_parser.add_argument('--only-changed', dest='only_changed', action='store_true',
help="Only show configurations that have changed from the default")
elif self.action == "dump":
self.parser.add_option('--only-changed', dest='only_changed', action='store_true',
help="Only show configurations that have changed from the default")
view_parser = subparsers.add_parser('view', help='View configuration file', parents=[common])
view_parser.set_defaults(func=self.execute_view)
elif self.action == "update":
self.parser.add_option('-s', '--setting', dest='setting', help="config setting, the section defaults to 'defaults'")
self.parser.set_usage("usage: %prog update [options] [-c ansible.cfg] -s '[section.]setting=value'")
# update_parser = subparsers.add_parser('update', help='Update configuration option')
# update_parser.set_defaults(func=self.execute_update)
# update_parser.add_argument('-s', '--setting', dest='setting',
# help="config setting, the section defaults to 'defaults'",
# metavar='[section.]setting=value')
elif self.action == "search":
self.parser.set_usage("usage: %prog update [options] [-c ansible.cfg] <search term>")
# search_parser = subparsers.add_parser('search', help='Search configuration')
# search_parser.set_defaults(func=self.execute_search)
# search_parser.add_argument('args', help='Search term', metavar='<search term>')
def post_process_args(self, options, args):
options, args = super(ConfigCLI, self).post_process_args(options, args)
def post_process_args(self, options):
options = super(ConfigCLI, self).post_process_args(options)
display.verbosity = options.verbosity
return options, args
return options
def run(self):
@ -87,15 +93,15 @@ class ConfigCLI(CLI):
os.environ['ANSIBLE_CONFIG'] = to_native(self.config_file)
except Exception:
if self.action in ['view']:
if context.CLIARGS['action'] in ['view']:
raise
elif self.action in ['edit', 'update']:
elif context.CLIARGS['action'] in ['edit', 'update']:
display.warning("File does not exist, used empty file: %s" % self.config_file)
elif self.action == 'view':
elif context.CLIARGS['action'] == 'view':
raise AnsibleError('Invalid or no config file was supplied')
self.execute()
context.CLIARGS['func']()
def execute_update(self):
'''

@ -27,7 +27,7 @@ import sys
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.parsing.convert_bool import boolean
@ -80,7 +80,6 @@ class ConsoleCLI(CLI, cmd.Cmd):
def init_parser(self):
super(ConsoleCLI, self).init_parser(
usage='%prog [<host-pattern>] [options]',
desc="REPL console for executing Ansible tasks.",
epilog="This is not a live session/connection, each task executes in the background and returns it's results."
)
@ -94,14 +93,15 @@ class ConsoleCLI(CLI, cmd.Cmd):
opt_help.add_basedir_options(self.parser)
# options unique to shell
self.parser.add_option('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
self.parser.add_argument('pattern', help='host pattern', metavar='pattern', default='all', nargs='?')
self.parser.add_argument('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
def post_process_args(self, options, args):
options, args = super(ConsoleCLI, self).post_process_args(options, args)
def post_process_args(self, options):
options = super(ConsoleCLI, self).post_process_args(options)
display.verbosity = options.verbosity
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
return options, args
self.validate_conflicts(options, runas_opts=True, fork_opts=True)
return options
def get_names(self):
return dir(self)
@ -408,10 +408,7 @@ class ConsoleCLI(CLI, cmd.Cmd):
becomepass = None
# hosts
if len(context.CLIARGS['args']) != 1:
self.pattern = 'all'
else:
self.pattern = context.CLIARGS['args'][0]
self.pattern = context.CLIARGS['pattern']
self.cwd = self.pattern
# Defaults from the command line

@ -17,7 +17,7 @@ import ansible.plugins.loader as plugin_loader
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils._text import to_native
from ansible.module_utils.common._collections_compat import Sequence
@ -49,34 +49,33 @@ class DocCLI(CLI):
def init_parser(self):
super(DocCLI, self).init_parser(
usage='usage: %prog [-l|-F|-s] [options] [-t <plugin type> ] [plugin]',
desc="plugin documentation tool",
epilog="See man pages for Ansible CLI options or website for tutorials https://docs.ansible.com"
)
opt_help.add_module_options(self.parser)
self.parser.add_option("-F", "--list_files", action="store_true", default=False, dest="list_files",
self.parser.add_argument('args', nargs='*', help='Plugin', metavar='plugin')
self.parser.add_argument("-t", "--type", action="store", default='module', dest='type',
help='Choose which plugin type (defaults to "module"). '
'Available plugin types are : {0}'.format(C.DOCUMENTABLE_PLUGINS),
choices=C.DOCUMENTABLE_PLUGINS)
exclusive = self.parser.add_mutually_exclusive_group()
exclusive.add_argument("-F", "--list_files", action="store_true", default=False, dest="list_files",
help='Show plugin names and their source files without summaries (implies --list)')
self.parser.add_option("-l", "--list", action="store_true", default=False, dest='list_dir',
exclusive.add_argument("-l", "--list", action="store_true", default=False, dest='list_dir',
help='List available plugins')
self.parser.add_option("-s", "--snippet", action="store_true", default=False, dest='show_snippet',
exclusive.add_argument("-s", "--snippet", action="store_true", default=False, dest='show_snippet',
help='Show playbook snippet for specified plugin(s)')
self.parser.add_option("-j", "--json", action="store_true", default=False, dest='json_dump',
exclusive.add_argument("-j", "--json", action="store_true", default=False, dest='json_dump',
help='**For internal testing only** Dump json metadata for all plugins.')
self.parser.add_option("-t", "--type", action="store", default='module', dest='type', type='choice',
help='Choose which plugin type (defaults to "module"). '
'Available plugin types are : {0}'.format(C.DOCUMENTABLE_PLUGINS),
choices=C.DOCUMENTABLE_PLUGINS)
def post_process_args(self, options, args):
options, args = super(DocCLI, self).post_process_args(options, args)
if [options.json_dump, options.list_dir, options.list_files, options.show_snippet].count(True) > 1:
raise AnsibleOptionsError("Only one of -l, -F, -s, or -j can be used at the same time.")
def post_process_args(self, options):
options = super(DocCLI, self).post_process_args(options)
display.verbosity = options.verbosity
return options, args
return options
def run(self):

@ -8,7 +8,6 @@ __metaclass__ = type
import os.path
import re
import shutil
import sys
import time
import yaml
@ -17,7 +16,7 @@ from jinja2 import Environment, FileSystemLoader
import ansible.constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.galaxy import Galaxy
from ansible.galaxy.api import GalaxyAPI
@ -35,109 +34,129 @@ class GalaxyCLI(CLI):
'''command to manage Ansible roles in shared repositories, the default of which is Ansible Galaxy *https://galaxy.ansible.com*.'''
SKIP_INFO_KEYS = ("name", "description", "readme_html", "related", "summary_fields", "average_aw_composite", "average_aw_score", "url")
VALID_ACTIONS = frozenset(("delete", "import", "info", "init", "install", "list", "login", "remove", "search", "setup"))
def __init__(self, args):
self.api = None
self.galaxy = None
super(GalaxyCLI, self).__init__(args)
def set_action(self):
super(GalaxyCLI, self).set_action()
# specific to actions
if self.action == "delete":
self.parser.set_usage("usage: %prog delete [options] github_user github_repo")
self.parser.set_description("Removes the role from Galaxy. It does not remove or alter the actual GitHub repository.")
elif self.action == "import":
self.parser.set_usage("usage: %prog import [options] github_user github_repo")
self.parser.set_description("Import a role.")
self.parser.add_option('--no-wait', dest='wait', action='store_false', default=True, help='Don\'t wait for import results.')
self.parser.add_option('--branch', dest='reference',
help='The name of a branch to import. Defaults to the repository\'s default branch (usually master)')
self.parser.add_option('--role-name', dest='role_name', help='The name the role should have, if different than the repo name')
self.parser.add_option('--status', dest='check_status', action='store_true', default=False,
help='Check the status of the most recent import request for given github_user/github_repo.')
elif self.action == "info":
self.parser.set_usage("usage: %prog info [options] role_name[,version]")
self.parser.set_description("View more details about a specific role.")
elif self.action == "init":
self.parser.set_usage("usage: %prog init [options] role_name")
self.parser.set_description("Initialize new role with the base structure of a role.")
self.parser.add_option('--init-path', dest='init_path', default="./",
help='The path in which the skeleton role will be created. The default is the current working directory.')
self.parser.add_option('--type', dest='role_type', action='store', default='default',
help="Initialize using an alternate role type. Valid types include: 'container', 'apb' and 'network'.")
self.parser.add_option('--role-skeleton', dest='role_skeleton', default=C.GALAXY_ROLE_SKELETON,
help='The path to a role skeleton that the new role should be based upon.')
elif self.action == "install":
self.parser.set_usage("usage: %prog install [options] [-r FILE | role_name(s)[,version] | scm+role_repo_url[,version] | tar_file(s)]")
self.parser.set_description("Install Roles from file(s), URL(s) or tar file(s)")
self.parser.add_option('-i', '--ignore-errors', dest='ignore_errors', action='store_true', default=False,
help='Ignore errors and continue with the next specified role.')
self.parser.add_option('-n', '--no-deps', dest='no_deps', action='store_true', default=False, help='Don\'t download roles listed as dependencies')
self.parser.add_option('-r', '--role-file', dest='role_file', help='A file containing a list of roles to be imported')
self.parser.add_option('-g', '--keep-scm-meta', dest='keep_scm_meta', action='store_true',
default=False, help='Use tar instead of the scm archive option when packaging the role')
elif self.action == "remove":
self.parser.set_usage("usage: %prog remove role1 role2 ...")
self.parser.set_description("Delete a role from roles_path.")
elif self.action == "list":
self.parser.set_usage("usage: %prog list [role_name]")
self.parser.set_description("Show the name and version of each role installed in the roles_path.")
elif self.action == "login":
self.parser.set_usage("usage: %prog login [options]")
self.parser.set_description("Login to api.github.com server in order to use ansible-galaxy sub command such as 'import', 'delete' and 'setup'.")
self.parser.add_option('--github-token', dest='token', default=None, help='Identify with github token rather than username and password.')
elif self.action == "search":
self.parser.set_usage("usage: %prog search [searchterm1 searchterm2] [--galaxy-tags galaxy_tag1,galaxy_tag2] [--platforms platform1,platform2] "
"[--author username]")
self.parser.add_option('--platforms', dest='platforms', help='list of OS platforms to filter by')
self.parser.add_option('--galaxy-tags', dest='galaxy_tags', help='list of galaxy tags to filter by')
self.parser.add_option('--author', dest='author', help='GitHub username')
self.parser.set_description("Search the Galaxy database by tags, platforms, author and multiple keywords.")
elif self.action == "setup":
self.parser.set_usage("usage: %prog setup [options] source github_user github_repo secret")
self.parser.add_option('--remove', dest='remove_id', default=None,
help='Remove the integration matching the provided ID value. Use --list to see ID values.')
self.parser.add_option('--list', dest="setup_list", action='store_true', default=False, help='List all of your integrations.')
self.parser.set_description("Manage the integration between Galaxy and the given source.")
# options that apply to more than one action
if self.action in ['init', 'info']:
self.parser.add_option('--offline', dest='offline', default=False, action='store_true', help="Don't query the galaxy API when creating roles")
if self.action not in ("delete", "import", "init", "login", "setup"):
# NOTE: while the option type=str, the default is a list, and the
# callback will set the value to a list.
self.parser.add_option('-p', '--roles-path', dest='roles_path', action="callback", callback=opt_help.unfrack_paths, default=C.DEFAULT_ROLES_PATH,
help='The path to the directory containing your roles. The default is the roles_path configured in your ansible.cfg'
' file (/etc/ansible/roles if not configured)', type='str')
if self.action in ("init", "install"):
self.parser.add_option('-f', '--force', dest='force', action='store_true', default=False, help='Force overwriting an existing role')
if self.action == "install":
self.parser.add_option('--force-with-deps', dest='force_with_deps', action='store_true', default=False,
help="Force overwriting an existing role and it's dependencies")
def init_parser(self):
''' create an options parser for bin/ansible '''
super(GalaxyCLI, self).init_parser(
usage="usage: %%prog [%s] [--help] [options] ..." % "|".join(sorted(self.VALID_ACTIONS)),
epilog="\nSee '%s <command> --help' for more information on a specific command.\n\n" % os.path.basename(sys.argv[0]),
desc="Perform various Role related operations.",
)
# common
self.parser.add_option('-s', '--server', dest='api_server', default=C.GALAXY_SERVER, help='The API server destination')
self.parser.add_option('-c', '--ignore-certs', action='store_true', dest='ignore_certs', default=C.GALAXY_IGNORE_CERTS,
help='Ignore SSL certificate validation errors.')
self.set_action()
common = opt_help.argparse.ArgumentParser(add_help=False)
common.add_argument('-s', '--server', dest='api_server', default=C.GALAXY_SERVER, help='The API server destination')
common.add_argument('-c', '--ignore-certs', action='store_true', dest='ignore_certs', default=C.GALAXY_IGNORE_CERTS,
help='Ignore SSL certificate validation errors.')
opt_help.add_verbosity_options(common)
# options that apply to more than one action
user_repo = opt_help.argparse.ArgumentParser(add_help=False)
user_repo.add_argument('github_user', help='GitHub username')
user_repo.add_argument('github_repo', help='GitHub repository')
offline = opt_help.argparse.ArgumentParser(add_help=False)
offline.add_argument('--offline', dest='offline', default=False, action='store_true',
help="Don't query the galaxy API when creating roles")
roles_path = opt_help.argparse.ArgumentParser(add_help=False)
roles_path.add_argument('-p', '--roles-path', dest='roles_path', type=opt_help.unfrack_path(pathsep=True),
default=C.DEFAULT_ROLES_PATH, action=opt_help.PrependListAction,
help='The path to the directory containing your roles. The default is the roles_path '
'configured in your ansible.cfg file (/etc/ansible/roles if not configured)')
force = opt_help.argparse.ArgumentParser(add_help=False)
force.add_argument('-f', '--force', dest='force', action='store_true', default=False, help='Force overwriting an existing role')
subparsers = self.parser.add_subparsers(dest='action')
subparsers.required = True
delete_parser = subparsers.add_parser('delete', parents=[user_repo, common],
help='Removes the role from Galaxy. It does not remove or alter the actual GitHub repository.')
delete_parser.set_defaults(func=self.execute_delete)
import_parser = subparsers.add_parser('import', help='Import a role', parents=[user_repo, common])
import_parser.set_defaults(func=self.execute_import)
import_parser.add_argument('--no-wait', dest='wait', action='store_false', default=True, help="Don't wait for import results.")
import_parser.add_argument('--branch', dest='reference',
help='The name of a branch to import. Defaults to the repository\'s default branch (usually master)')
import_parser.add_argument('--role-name', dest='role_name', help='The name the role should have, if different than the repo name')
import_parser.add_argument('--status', dest='check_status', action='store_true', default=False,
help='Check the status of the most recent import request for given github_user/github_repo.')
def post_process_args(self, options, args):
options, args = super(GalaxyCLI, self).post_process_args(options, args)
info_parser = subparsers.add_parser('info', help='View more details about a specific role.',
parents=[offline, common, roles_path])
info_parser.set_defaults(func=self.execute_info)
info_parser.add_argument('args', nargs='+', help='role', metavar='role_name[,version]')
init_parser = subparsers.add_parser('init', help='Initialize new role with the base structure of a role.',
parents=[offline, force, common])
init_parser.set_defaults(func=self.execute_init)
init_parser.add_argument('--init-path', dest='init_path', default="./",
help='The path in which the skeleton role will be created. The default is the current working directory.')
init_parser.add_argument('--type', dest='role_type', action='store', default='default',
help="Initialize using an alternate role type. Valid types include: 'container', 'apb' and 'network'.")
init_parser.add_argument('--role-skeleton', dest='role_skeleton', default=C.GALAXY_ROLE_SKELETON,
help='The path to a role skeleton that the new role should be based upon.')
init_parser.add_argument('role_name', help='Role name')
install_parser = subparsers.add_parser('install', help='Install Roles from file(s), URL(s) or tar file(s)',
parents=[force, common, roles_path])
install_parser.set_defaults(func=self.execute_install)
install_parser.add_argument('-i', '--ignore-errors', dest='ignore_errors', action='store_true', default=False,
help='Ignore errors and continue with the next specified role.')
install_parser.add_argument('-r', '--role-file', dest='role_file', help='A file containing a list of roles to be imported')
install_parser.add_argument('-g', '--keep-scm-meta', dest='keep_scm_meta', action='store_true',
default=False, help='Use tar instead of the scm archive option when packaging the role')
install_parser.add_argument('args', help='Role name, URL or tar file', metavar='role', nargs='*')
install_exclusive = install_parser.add_mutually_exclusive_group()
install_exclusive.add_argument('-n', '--no-deps', dest='no_deps', action='store_true', default=False,
help="Don't download roles listed as dependencies")
install_exclusive.add_argument('--force-with-deps', dest='force_with_deps', action='store_true', default=False,
help="Force overwriting an existing role and it's dependencies")
remove_parser = subparsers.add_parser('remove', help='Delete roles from roles_path.', parents=[common, roles_path])
remove_parser.set_defaults(func=self.execute_remove)
remove_parser.add_argument('args', help='Role(s)', metavar='role', nargs='+')
list_parser = subparsers.add_parser('list', help='Show the name and version of each role installed in the roles_path.',
parents=[common, roles_path])
list_parser.set_defaults(func=self.execute_list)
list_parser.add_argument('role', help='Role', nargs='?', metavar='role')
login_parser = subparsers.add_parser('login', parents=[common],
help="Login to api.github.com server in order to use ansible-galaxy sub "
"command such as 'import', 'delete' and 'setup'")
login_parser.set_defaults(func=self.execute_login)
login_parser.add_argument('--github-token', dest='token', default=None, help='Identify with github token rather than username and password.')
search_parser = subparsers.add_parser('search', help='Search the Galaxy database by tags, platforms, author and multiple keywords.',
parents=[common])
search_parser.set_defaults(func=self.execute_search)
search_parser.add_argument('--platforms', dest='platforms', help='list of OS platforms to filter by')
search_parser.add_argument('--galaxy-tags', dest='galaxy_tags', help='list of galaxy tags to filter by')
search_parser.add_argument('--author', dest='author', help='GitHub username')
search_parser.add_argument('args', help='Search terms', metavar='searchterm', nargs='*')
setup_parser = subparsers.add_parser('setup', help='Manage the integration between Galaxy and the given source.',
parents=[roles_path, common])
setup_parser.set_defaults(func=self.execute_setup)
setup_parser.add_argument('--remove', dest='remove_id', default=None,
help='Remove the integration matching the provided ID value. Use --list to see ID values.')
setup_parser.add_argument('--list', dest="setup_list", action='store_true', default=False, help='List all of your integrations.')
setup_parser.add_argument('source', help='Source')
setup_parser.add_argument('github_user', help='GitHub username')
setup_parser.add_argument('github_repo', help='GitHub repository')
setup_parser.add_argument('secret', help='Secret')
def post_process_args(self, options):
options = super(GalaxyCLI, self).post_process_args(options)
display.verbosity = options.verbosity
return options, args
return options
def run(self):
@ -146,7 +165,7 @@ class GalaxyCLI(CLI):
self.galaxy = Galaxy()
self.api = GalaxyAPI(self.galaxy)
self.execute()
context.CLIARGS['func']()
@staticmethod
def exit_without_ignore(rc=1):
@ -192,9 +211,7 @@ class GalaxyCLI(CLI):
force = context.CLIARGS['force']
role_skeleton = context.CLIARGS['role_skeleton']
role_name = context.CLIARGS['args'][0].strip() if context.CLIARGS['args'] else None
if not role_name:
raise AnsibleOptionsError("- no role name specified for init")
role_name = context.CLIARGS['role_name']
role_path = os.path.join(init_path, role_name)
if os.path.exists(role_path):
if os.path.isfile(role_path):
@ -260,10 +277,6 @@ class GalaxyCLI(CLI):
prints out detailed information about an installed role as well as info available from the galaxy API.
"""
if not context.CLIARGS['args']:
# the user needs to specify a role
raise AnsibleOptionsError("- you must specify a user/role name")
roles_path = context.CLIARGS['roles_path']
data = ''
@ -316,9 +329,6 @@ class GalaxyCLI(CLI):
no_deps = context.CLIARGS['no_deps']
force_deps = context.CLIARGS['force_with_deps']
if no_deps and force_deps:
raise AnsibleOptionsError("You cannot both force dependencies and no dependencies")
force = context.CLIARGS['force'] or force_deps
roles_left = []
@ -329,7 +339,9 @@ class GalaxyCLI(CLI):
try:
required_roles = yaml.safe_load(f.read())
except Exception as e:
raise AnsibleError("Unable to load data from the requirements file (%s): %s" % (role_file, to_native(e)))
raise AnsibleError(
"Unable to load data from the requirements file (%s): %s" % (role_file, to_native(e))
)
if required_roles is None:
raise AnsibleError("No roles found in file: %s" % role_file)
@ -463,9 +475,6 @@ class GalaxyCLI(CLI):
lists the roles installed on the local system or matches a single role passed as an argument.
"""
if len(context.CLIARGS['args']) > 1:
raise AnsibleOptionsError("- please specify only one role to list, or specify no roles to see a full list")
def _display_role(gr):
install_info = gr.install_info
version = None
@ -475,9 +484,9 @@ class GalaxyCLI(CLI):
version = "(unknown version)"
display.display("- %s, %s" % (gr.name, version))
if context.CLIARGS['args']:
if context.CLIARGS['role']:
# show the requested role, if it exists
name = context.CLIARGS['args'][0]
name = context.CLIARGS['role']
gr = GalaxyRole(self.galaxy, name)
if gr.metadata:
display.display('# %s' % os.path.dirname(gr.path))
@ -553,7 +562,7 @@ class GalaxyCLI(CLI):
def execute_login(self):
"""
verify user's identify via GitHub and retrieve an auth token from Ansible Galaxy.
verify user's identify via Github and retrieve an auth token from Ansible Galaxy.
"""
# Authenticate with github and retrieve a token
if context.CLIARGS['token'] is None:
@ -605,7 +614,7 @@ class GalaxyCLI(CLI):
if len(task) > 1:
# found multiple roles associated with github_user/github_repo
display.display("WARNING: More than one Galaxy role associated with GitHub repo %s/%s." % (github_user, github_repo),
display.display("WARNING: More than one Galaxy role associated with Github repo %s/%s." % (github_user, github_repo),
color='yellow')
display.display("The following Galaxy roles are being updated:" + u'\n', color=C.COLOR_CHANGED)
for t in task:
@ -637,7 +646,7 @@ class GalaxyCLI(CLI):
return 0
def execute_setup(self):
""" Setup an integration from GitHub or Travis for Ansible Galaxy roles"""
""" Setup an integration from Github or Travis for Ansible Galaxy roles"""
if context.CLIARGS['setup_list']:
# List existing integration secrets
@ -659,13 +668,10 @@ class GalaxyCLI(CLI):
display.display("Secret removed. Integrations using this secret will not longer work.", color=C.COLOR_OK)
return 0
if len(context.CLIARGS['args']) < 4:
raise AnsibleError("Missing one or more arguments. Expecting: source github_user github_repo secret")
source = context.CLIARGS['args'][0]
github_user = context.CLIARGS['args'][1]
github_repo = context.CLIARGS['args'][2]
secret = context.CLIARGS['args'][3]
source = context.CLIARGS['source']
github_user = context.CLIARGS['github_user']
github_repo = context.CLIARGS['github_repo']
secret = context.CLIARGS['secret']
resp = self.api.add_secret(source, github_user, github_repo, secret)
display.display("Added integration for %s %s/%s" % (resp['source'], resp['github_user'], resp['github_repo']))
@ -675,11 +681,8 @@ class GalaxyCLI(CLI):
def execute_delete(self):
""" Delete a role from Ansible Galaxy. """
if len(context.CLIARGS['args']) < 2:
raise AnsibleError("Missing one or more arguments. Expected: github_user github_repo")
github_user = context.CLIARGS['args'][0]
github_repo = context.CLIARGS['args'][1]
github_user = context.CLIARGS['github_user']
github_repo = context.CLIARGS['github_repo']
resp = self.api.delete_role(github_user, github_repo)
if len(resp['deleted_roles']) > 1:

@ -5,13 +5,13 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import optparse
import argparse
from operator import attrgetter
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.inventory.host import Host
from ansible.module_utils._text import to_bytes, to_native
@ -64,39 +64,41 @@ class InventoryCLI(CLI):
opt_help.add_basedir_options(self.parser)
# remove unused default options
self.parser.remove_option('--limit')
self.parser.remove_option('--list-hosts')
self.parser.add_argument('--limit', default=argparse.SUPPRESS, type=lambda v: self.parser.error('unrecognized arguments: --limit'))
self.parser.add_argument('--list-hosts', default=argparse.SUPPRESS, type=lambda v: self.parser.error('unrecognized arguments: --list-hosts'))
self.parser.add_argument('args', metavar='host|group', nargs='?')
# Actions
action_group = optparse.OptionGroup(self.parser, "Actions", "One of following must be used on invocation, ONLY ONE!")
action_group.add_option("--list", action="store_true", default=False, dest='list', help='Output all hosts info, works as inventory script')
action_group.add_option("--host", action="store", default=None, dest='host', help='Output specific host info, works as inventory script')
action_group.add_option("--graph", action="store_true", default=False, dest='graph',
help='create inventory graph, if supplying pattern it must be a valid group name')
self.parser.add_option_group(action_group)
action_group = self.parser.add_argument_group("Actions", "One of following must be used on invocation, ONLY ONE!")
action_group.add_argument("--list", action="store_true", default=False, dest='list', help='Output all hosts info, works as inventory script')
action_group.add_argument("--host", action="store", default=None, dest='host', help='Output specific host info, works as inventory script')
action_group.add_argument("--graph", action="store_true", default=False, dest='graph',
help='create inventory graph, if supplying pattern it must be a valid group name')
self.parser.add_argument_group(action_group)
# graph
self.parser.add_option("-y", "--yaml", action="store_true", default=False, dest='yaml',
help='Use YAML format instead of default JSON, ignored for --graph')
self.parser.add_option('--toml', action='store_true', default=False, dest='toml',
help='Use TOML format instead of default JSON, ignored for --graph')
self.parser.add_option("--vars", action="store_true", default=False, dest='show_vars',
help='Add vars to graph display, ignored unless used with --graph')
self.parser.add_argument("-y", "--yaml", action="store_true", default=False, dest='yaml',
help='Use YAML format instead of default JSON, ignored for --graph')
self.parser.add_argument('--toml', action='store_true', default=False, dest='toml',
help='Use TOML format instead of default JSON, ignored for --graph')
self.parser.add_argument("--vars", action="store_true", default=False, dest='show_vars',
help='Add vars to graph display, ignored unless used with --graph')
# list
self.parser.add_option("--export", action="store_true", default=C.INVENTORY_EXPORT, dest='export',
help="When doing an --list, represent in a way that is optimized for export,"
"not as an accurate representation of how Ansible has processed it")
self.parser.add_option('--output', default=None, dest='output_file',
help="When doing an --list, send the inventory to a file instead of of to screen")
# self.parser.add_option("--ignore-vars-plugins", action="store_true", default=False, dest='ignore_vars_plugins',
# help="When doing an --list, skip vars data from vars plugins, by default, this would include group_vars/ and host_vars/")
self.parser.add_argument("--export", action="store_true", default=C.INVENTORY_EXPORT, dest='export',
help="When doing an --list, represent in a way that is optimized for export,"
"not as an accurate representation of how Ansible has processed it")
self.parser.add_argument('--output', default=None, dest='output_file',
help="When doing --list, send the inventory to a file instead of to the screen")
# self.parser.add_argument("--ignore-vars-plugins", action="store_true", default=False, dest='ignore_vars_plugins',
# help="When doing an --list, skip vars data from vars plugins, by default, this would include group_vars/ and host_vars/")
def post_process_args(self, options, args):
options, args = super(InventoryCLI, self).post_process_args(options, args)
def post_process_args(self, options):
options = super(InventoryCLI, self).post_process_args(options)
display.verbosity = options.verbosity
self.validate_conflicts(options, vault_opts=True)
self.validate_conflicts(options)
# there can be only one! and, at least, one!
used = 0
@ -109,12 +111,12 @@ class InventoryCLI(CLI):
raise AnsibleOptionsError("Conflicting options used, only one of --host, --graph or --list can be used at the same time.")
# set host pattern to default if not supplied
if len(args) > 0:
options.pattern = args[0]
if options.args:
options.pattern = options.args[0]
else:
options.pattern = 'all'
return options, args
return options
def run(self):

@ -10,8 +10,8 @@ import stat
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.module_utils._text import to_bytes
from ansible.playbook.block import Block
@ -46,25 +46,23 @@ class PlaybookCLI(CLI):
opt_help.add_module_options(self.parser)
# ansible playbook specific opts
self.parser.add_option('--list-tasks', dest='listtasks', action='store_true',
help="list all tasks that would be executed")
self.parser.add_option('--list-tags', dest='listtags', action='store_true',
help="list all available tags")
self.parser.add_option('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
self.parser.add_option('--start-at-task', dest='start_at_task',
help="start the playbook at the task matching this name")
def post_process_args(self, options, args):
options, args = super(PlaybookCLI, self).post_process_args(options, args)
if len(args) == 0:
raise AnsibleOptionsError("You must specify a playbook file to run")
self.parser.add_argument('--list-tasks', dest='listtasks', action='store_true',
help="list all tasks that would be executed")
self.parser.add_argument('--list-tags', dest='listtags', action='store_true',
help="list all available tags")
self.parser.add_argument('--step', dest='step', action='store_true',
help="one-step-at-a-time: confirm each task before running")
self.parser.add_argument('--start-at-task', dest='start_at_task',
help="start the playbook at the task matching this name")
self.parser.add_argument('args', help='Playbook(s)', metavar='playbook', nargs='+')
def post_process_args(self, options):
options = super(PlaybookCLI, self).post_process_args(options)
display.verbosity = options.verbosity
self.validate_conflicts(options, runas_opts=True, vault_opts=True, fork_opts=True)
self.validate_conflicts(options, runas_opts=True, fork_opts=True)
return options, args
return options
def run(self):

@ -17,7 +17,7 @@ import time
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleOptionsError
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.six.moves import shlex_quote
@ -83,41 +83,43 @@ class PullCLI(CLI):
opt_help.add_module_options(self.parser)
opt_help.add_runas_prompt_options(self.parser)
self.parser.add_argument('args', help='Playbook(s)', metavar='playbook.yml', nargs='*')
# options unique to pull
self.parser.add_option('--purge', default=False, action='store_true', help='purge checkout after playbook run')
self.parser.add_option('-o', '--only-if-changed', dest='ifchanged', default=False, action='store_true',
help='only run the playbook if the repository has been updated')
self.parser.add_option('-s', '--sleep', dest='sleep', default=None,
help='sleep for random interval (between 0 and n number of seconds) before starting. '
'This is a useful way to disperse git requests')
self.parser.add_option('-f', '--force', dest='force', default=False, action='store_true',
help='run the playbook even if the repository could not be updated')
self.parser.add_option('-d', '--directory', dest='dest', default=None, help='directory to checkout repository to')
self.parser.add_option('-U', '--url', dest='url', default=None, help='URL of the playbook repository')
self.parser.add_option('--full', dest='fullclone', action='store_true', help='Do a full clone, instead of a shallow one.')
self.parser.add_option('-C', '--checkout', dest='checkout',
help='branch/tag/commit to checkout. Defaults to behavior of repository module.')
self.parser.add_option('--accept-host-key', default=False, dest='accept_host_key', action='store_true',
help='adds the hostkey for the repo url if not already added')
self.parser.add_option('-m', '--module-name', dest='module_name', default=self.DEFAULT_REPO_TYPE,
help='Repository module name, which ansible will use to check out the repo. Choices are %s. Default is %s.'
% (self.REPO_CHOICES, self.DEFAULT_REPO_TYPE))
self.parser.add_option('--verify-commit', dest='verify', default=False, action='store_true',
help='verify GPG signature of checked out commit, if it fails abort running the playbook. '
'This needs the corresponding VCS module to support such an operation')
self.parser.add_option('--clean', dest='clean', default=False, action='store_true',
help='modified files in the working repository will be discarded')
self.parser.add_option('--track-subs', dest='tracksubs', default=False, action='store_true',
help='submodules will track the latest changes. This is equivalent to specifying the --remote flag to git submodule update')
self.parser.add_argument('--purge', default=False, action='store_true', help='purge checkout after playbook run')
self.parser.add_argument('-o', '--only-if-changed', dest='ifchanged', default=False, action='store_true',
help='only run the playbook if the repository has been updated')
self.parser.add_argument('-s', '--sleep', dest='sleep', default=None,
help='sleep for random interval (between 0 and n number of seconds) before starting. '
'This is a useful way to disperse git requests')
self.parser.add_argument('-f', '--force', dest='force', default=False, action='store_true',
help='run the playbook even if the repository could not be updated')
self.parser.add_argument('-d', '--directory', dest='dest', default=None, help='directory to checkout repository to')
self.parser.add_argument('-U', '--url', dest='url', default=None, help='URL of the playbook repository')
self.parser.add_argument('--full', dest='fullclone', action='store_true', help='Do a full clone, instead of a shallow one.')
self.parser.add_argument('-C', '--checkout', dest='checkout',
help='branch/tag/commit to checkout. Defaults to behavior of repository module.')
self.parser.add_argument('--accept-host-key', default=False, dest='accept_host_key', action='store_true',
help='adds the hostkey for the repo url if not already added')
self.parser.add_argument('-m', '--module-name', dest='module_name', default=self.DEFAULT_REPO_TYPE,
help='Repository module name, which ansible will use to check out the repo. Choices are %s. Default is %s.'
% (self.REPO_CHOICES, self.DEFAULT_REPO_TYPE))
self.parser.add_argument('--verify-commit', dest='verify', default=False, action='store_true',
help='verify GPG signature of checked out commit, if it fails abort running the playbook. '
'This needs the corresponding VCS module to support such an operation')
self.parser.add_argument('--clean', dest='clean', default=False, action='store_true',
help='modified files in the working repository will be discarded')
self.parser.add_argument('--track-subs', dest='tracksubs', default=False, action='store_true',
help='submodules will track the latest changes. This is equivalent to specifying the --remote flag to git submodule update')
# add a subset of the check_opts flag group manually, as the full set's
# shortcodes conflict with above --checkout/-C
self.parser.add_option("--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
self.parser.add_option("--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
self.parser.add_argument("--check", default=False, dest='check', action='store_true',
help="don't make any changes; instead, try to predict some of the changes that may occur")
self.parser.add_argument("--diff", default=C.DIFF_ALWAYS, dest='diff', action='store_true',
help="when changing (small) files and templates, show the differences in those files; works great with --check")
def post_process_args(self, options, args):
options, args = super(PullCLI, self).post_process_args(options, args)
def post_process_args(self, options):
options = super(PullCLI, self).post_process_args(options)
if not options.dest:
hostname = socket.getfqdn()
@ -142,9 +144,9 @@ class PullCLI(CLI):
raise AnsibleOptionsError("Unsupported repo module %s, choices are %s" % (options.module_name, ','.join(self.SUPPORTED_REPO_MODULES)))
display.verbosity = options.verbosity
self.validate_conflicts(options, vault_opts=True)
self.validate_conflicts(options)
return options, args
return options
def run(self):
''' use Runner lib to do SSH things '''

@ -11,7 +11,7 @@ import sys
from ansible import constants as C
from ansible import context
from ansible.cli import CLI
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleOptionsError
from ansible.module_utils._text import to_text, to_bytes
from ansible.parsing.dataloader import DataLoader
@ -32,8 +32,6 @@ class VaultCLI(CLI):
If you'd like to not expose what variables you are using, you can keep an individual task file entirely encrypted.
'''
VALID_ACTIONS = frozenset(("create", "decrypt", "edit", "encrypt", "encrypt_string", "rekey", "view"))
FROM_STDIN = "stdin"
FROM_ARGS = "the command line args"
FROM_PROMPT = "the interactive prompt"
@ -49,66 +47,76 @@ class VaultCLI(CLI):
self.new_encrypt_secret = None
self.new_encrypt_vault_id = None
self.can_output = ['encrypt', 'decrypt', 'encrypt_string']
super(VaultCLI, self).__init__(args)
def set_action(self):
super(VaultCLI, self).set_action()
# add output if needed
if self.action in self.can_output:
self.parser.add_option('--output', default=None, dest='output_file',
help='output file name for encrypt or decrypt; use - for stdout',
action="callback", callback=opt_help.unfrack_path, type='string')
# options specific to self.actions
if self.action == "create":
self.parser.set_usage("usage: %prog create [options] file_name")
elif self.action == "decrypt":
self.parser.set_usage("usage: %prog decrypt [options] file_name")
elif self.action == "edit":
self.parser.set_usage("usage: %prog edit [options] file_name")
elif self.action == "view":
self.parser.set_usage("usage: %prog view [options] file_name")
elif self.action == "encrypt":
self.parser.set_usage("usage: %prog encrypt [options] file_name")
# I have no prefence for either dash or underscore
elif self.action == "encrypt_string":
self.parser.add_option('-p', '--prompt', dest='encrypt_string_prompt',
action='store_true',
help="Prompt for the string to encrypt")
self.parser.add_option('-n', '--name', dest='encrypt_string_names',
action='append',
help="Specify the variable name")
self.parser.add_option('--stdin-name', dest='encrypt_string_stdin_name',
default=None,
help="Specify the variable name for stdin")
self.parser.set_usage("usage: %prog encrypt_string [--prompt] [options] string_to_encrypt")
elif self.action == "rekey":
self.parser.set_usage("usage: %prog rekey [options] file_name")
# For encrypting actions, we can also specify which of multiple vault ids should be used for encrypting
if self.action in ['create', 'encrypt', 'encrypt_string', 'rekey', 'edit']:
self.parser.add_option('--encrypt-vault-id', default=[], dest='encrypt_vault_id',
action='store', type='string',
help='the vault id used to encrypt (required if more than vault-id is provided)')
def init_parser(self):
super(VaultCLI, self).init_parser(
usage="usage: %%prog [%s] [options] [vaultfile.yml]" % "|".join(sorted(self.VALID_ACTIONS)),
desc="encryption/decryption utility for Ansible data files",
epilog="\nSee '%s <command> --help' for more information on a specific command.\n\n" % os.path.basename(sys.argv[0])
)
opt_help.add_vault_options(self.parser)
opt_help.add_vault_rekey_options(self.parser)
self.set_action()
common = opt_help.argparse.ArgumentParser(add_help=False)
opt_help.add_vault_options(common)
opt_help.add_verbosity_options(common)
subparsers = self.parser.add_subparsers(dest='action')
subparsers.required = True
def post_process_args(self, options, args):
options, args = super(VaultCLI, self).post_process_args(options, args)
self.validate_conflicts(options, vault_opts=True, vault_rekey_opts=True)
output = opt_help.argparse.ArgumentParser(add_help=False)
output.add_argument('--output', default=None, dest='output_file',
help='output file name for encrypt or decrypt; use - for stdout',
type=opt_help.unfrack_path())
# For encrypting actions, we can also specify which of multiple vault ids should be used for encrypting
vault_id = opt_help.argparse.ArgumentParser(add_help=False)
vault_id.add_argument('--encrypt-vault-id', default=[], dest='encrypt_vault_id',
action='store', type=str,
help='the vault id used to encrypt (required if more than vault-id is provided)')
create_parser = subparsers.add_parser('create', help='Create new vault encrypted file', parents=[vault_id, common])
create_parser.set_defaults(func=self.execute_create)
create_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
decrypt_parser = subparsers.add_parser('decrypt', help='Decrypt vault encrypted file', parents=[output, common])
decrypt_parser.set_defaults(func=self.execute_decrypt)
decrypt_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
edit_parser = subparsers.add_parser('edit', help='Edit vault encrypted file', parents=[vault_id, common])
edit_parser.set_defaults(func=self.execute_edit)
edit_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
view_parser = subparsers.add_parser('view', help='View vault encrypted file', parents=[common])
view_parser.set_defaults(func=self.execute_view)
view_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
encrypt_parser = subparsers.add_parser('encrypt', help='Encrypt YAML file', parents=[common, output, vault_id])
encrypt_parser.set_defaults(func=self.execute_encrypt)
encrypt_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
enc_str_parser = subparsers.add_parser('encrypt_string', help='Encrypt a string', parents=[common, output, vault_id])
enc_str_parser.set_defaults(func=self.execute_encrypt_string)
enc_str_parser.add_argument('args', help='String to encrypt', metavar='string_to_encrypt', nargs='*')
enc_str_parser.add_argument('-p', '--prompt', dest='encrypt_string_prompt',
action='store_true',
help="Prompt for the string to encrypt")
enc_str_parser.add_argument('-n', '--name', dest='encrypt_string_names',
action='append',
help="Specify the variable name")
enc_str_parser.add_argument('--stdin-name', dest='encrypt_string_stdin_name',
default=None,
help="Specify the variable name for stdin")
rekey_parser = subparsers.add_parser('rekey', help='Re-key a vault encrypted file', parents=[common, vault_id])
rekey_parser.set_defaults(func=self.execute_rekey)
rekey_new_group = rekey_parser.add_mutually_exclusive_group()
rekey_new_group.add_argument('--new-vault-password-file', default=None, dest='new_vault_password_file',
help="new vault password file for rekey", type=opt_help.unfrack_path())
rekey_new_group.add_argument('--new-vault-id', default=None, dest='new_vault_id', type=str,
help='the new vault identity to use for rekey')
rekey_parser.add_argument('args', help='Filename', metavar='file_name', nargs='*')
def post_process_args(self, options):
options = super(VaultCLI, self).post_process_args(options)
display.verbosity = options.verbosity
@ -117,27 +125,18 @@ class VaultCLI(CLI):
if u';' in vault_id:
raise AnsibleOptionsError("'%s' is not a valid vault id. The character ';' is not allowed in vault ids" % vault_id)
if self.action not in self.can_output:
if not args:
raise AnsibleOptionsError("Vault requires at least one filename as a parameter")
else:
# This restriction should remain in place until it's possible to
# load multiple YAML records from a single file, or it's too easy
# to create an encrypted file that can't be read back in. But in
# the meanwhile, "cat a b c|ansible-vault encrypt --output x" is
# a workaround.
if options.output_file and len(args) > 1:
raise AnsibleOptionsError("At most one input file may be used with the --output option")
if self.action == 'encrypt_string':
if '-' in args or not args or options.encrypt_string_stdin_name:
if getattr(options, 'output_file', None) and len(options.args) > 1:
raise AnsibleOptionsError("At most one input file may be used with the --output option")
if options.action == 'encrypt_string':
if '-' in options.args or not options.args or options.encrypt_string_stdin_name:
self.encrypt_string_read_stdin = True
# TODO: prompting from stdin and reading from stdin seem mutually exclusive, but verify that.
if options.encrypt_string_prompt and self.encrypt_string_read_stdin:
raise AnsibleOptionsError('The --prompt option is not supported if also reading input from stdin')
return options, args
return options
def run(self):
super(VaultCLI, self).run()
@ -156,20 +155,22 @@ class VaultCLI(CLI):
default_vault_ids = C.DEFAULT_VAULT_IDENTITY_LIST
vault_ids = default_vault_ids + vault_ids
action = context.CLIARGS['action']
# TODO: instead of prompting for these before, we could let VaultEditor
# call a callback when it needs it.
if self.action in ['decrypt', 'view', 'rekey', 'edit']:
if action in ['decrypt', 'view', 'rekey', 'edit']:
vault_secrets = self.setup_vault_secrets(loader, vault_ids=vault_ids,
vault_password_files=list(context.CLIARGS['vault_password_files']),
ask_vault_pass=context.CLIARGS['ask_vault_pass'])
if not vault_secrets:
raise AnsibleOptionsError("A vault password is required to use Ansible's Vault")
if self.action in ['encrypt', 'encrypt_string', 'create']:
if action in ['encrypt', 'encrypt_string', 'create']:
encrypt_vault_id = None
# no --encrypt-vault-id context.CLIARGS['encrypt_vault_id'] for 'edit'
if self.action not in ['edit']:
if action not in ['edit']:
encrypt_vault_id = context.CLIARGS['encrypt_vault_id'] or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
vault_secrets = None
@ -195,7 +196,7 @@ class VaultCLI(CLI):
self.encrypt_vault_id = encrypt_secret[0]
self.encrypt_secret = encrypt_secret[1]
if self.action in ['rekey']:
if action in ['rekey']:
encrypt_vault_id = context.CLIARGS['encrypt_vault_id'] or C.DEFAULT_VAULT_ENCRYPT_IDENTITY
# print('encrypt_vault_id: %s' % encrypt_vault_id)
# print('default_encrypt_vault_id: %s' % default_encrypt_vault_id)
@ -236,7 +237,7 @@ class VaultCLI(CLI):
vault = VaultLib(vault_secrets)
self.editor = VaultEditor(vault)
self.execute()
context.CLIARGS['func']()
# and restore umask
os.umask(old_umask)

@ -377,7 +377,7 @@ def get_file_vault_secret(filename=None, vault_id=None, encoding=None, loader=No
if loader.is_executable(this_path):
if script_is_client(filename):
display.vvvv('The vault password file %s is a client script.' % filename)
display.vvvv(u'The vault password file %s is a client script.' % to_text(filename))
# TODO: pass vault_id_name to script via cli
return ClientScriptVaultSecret(filename=this_path, vault_id=vault_id,
encoding=encoding, loader=loader)
@ -490,7 +490,7 @@ class ClientScriptVaultSecret(ScriptVaultSecret):
encoding=encoding,
loader=loader)
self._vault_id = vault_id
display.vvvv('Executing vault password client script: %s --vault-id %s' % (filename, vault_id))
display.vvvv(u'Executing vault password client script: %s --vault-id %s' % (to_text(filename), to_text(vault_id)))
def _run(self, command):
try:
@ -553,7 +553,7 @@ def match_best_secret(secrets, target_vault_ids):
def match_encrypt_vault_id_secret(secrets, encrypt_vault_id=None):
# See if the --encrypt-vault-id matches a vault-id
display.vvvv('encrypt_vault_id=%s' % encrypt_vault_id)
display.vvvv(u'encrypt_vault_id=%s' % to_text(encrypt_vault_id))
if encrypt_vault_id is None:
raise AnsibleError('match_encrypt_vault_id_secret requires a non None encrypt_vault_id')
@ -574,7 +574,7 @@ def match_encrypt_vault_id_secret(secrets, encrypt_vault_id=None):
def match_encrypt_secret(secrets, encrypt_vault_id=None):
'''Find the best/first/only secret in secrets to use for encrypting'''
display.vvvv('encrypt_vault_id=%s' % encrypt_vault_id)
display.vvvv(u'encrypt_vault_id=%s' % to_text(encrypt_vault_id))
# See if the --encrypt-vault-id matches a vault-id
if encrypt_vault_id:
return match_encrypt_vault_id_secret(secrets,
@ -629,9 +629,9 @@ class VaultLib:
# encrypt data
if vault_id:
display.vvvvv('Encrypting with vault_id "%s" and vault secret %s' % (vault_id, secret))
display.vvvvv(u'Encrypting with vault_id "%s" and vault secret %s' % (to_text(vault_id), to_text(secret)))
else:
display.vvvvv('Encrypting without a vault_id using vault secret %s' % secret)
display.vvvvv(u'Encrypting without a vault_id using vault secret %s' % to_text(secret))
b_ciphertext = this_cipher.encrypt(b_plaintext, secret)
@ -707,13 +707,13 @@ class VaultLib:
vault_secret_used = None
if vault_id:
display.vvvvv('Found a vault_id (%s) in the vaulttext' % (vault_id))
display.vvvvv(u'Found a vault_id (%s) in the vaulttext' % to_text(vault_id))
vault_id_matchers.append(vault_id)
_matches = match_secrets(self.secrets, vault_id_matchers)
if _matches:
display.vvvvv('We have a secret associated with vault id (%s), will try to use to decrypt %s' % (vault_id, to_text(filename)))
display.vvvvv(u'We have a secret associated with vault id (%s), will try to use to decrypt %s' % (to_text(vault_id), to_text(filename)))
else:
display.vvvvv('Found a vault_id (%s) in the vault text, but we do not have a associated secret (--vault-id)' % (vault_id))
display.vvvvv(u'Found a vault_id (%s) in the vault text, but we do not have a associated secret (--vault-id)' % to_text(vault_id))
# Not adding the other secrets to vault_secret_ids enforces a match between the vault_id from the vault_text and
# the known vault secrets.
@ -725,11 +725,11 @@ class VaultLib:
# for vault_secret_id in vault_secret_ids:
for vault_secret_id, vault_secret in matched_secrets:
display.vvvvv('Trying to use vault secret=(%s) id=%s to decrypt %s' % (vault_secret, vault_secret_id, to_text(filename)))
display.vvvvv(u'Trying to use vault secret=(%s) id=%s to decrypt %s' % (to_text(vault_secret), to_text(vault_secret_id), to_text(filename)))
try:
# secret = self.secrets[vault_secret_id]
display.vvvv('Trying secret %s for vault_id=%s' % (vault_secret, vault_secret_id))
display.vvvv(u'Trying secret %s for vault_id=%s' % (to_text(vault_secret), to_text(vault_secret_id)))
b_plaintext = this_cipher.decrypt(b_vaulttext, vault_secret)
if b_plaintext is not None:
vault_id_used = vault_secret_id
@ -737,18 +737,20 @@ class VaultLib:
file_slug = ''
if filename:
file_slug = ' of "%s"' % filename
display.vvvvv(u'Decrypt%s successful with secret=%s and vault_id=%s' % (to_text(file_slug), vault_secret, vault_secret_id))
display.vvvvv(
u'Decrypt%s successful with secret=%s and vault_id=%s' % (to_text(file_slug), to_text(vault_secret), to_text(vault_secret_id))
)
break
except AnsibleVaultFormatError as exc:
msg = "There was a vault format error"
msg = u"There was a vault format error"
if filename:
msg += ' in %s' % (to_text(filename))
msg += ': %s' % exc
msg += u' in %s' % (to_text(filename))
msg += u': %s' % exc
display.warning(msg)
raise
except AnsibleError as e:
display.vvvv('Tried to use the vault secret (%s) to decrypt (%s) but it failed. Error: %s' %
(vault_secret_id, to_text(filename), e))
display.vvvv(u'Tried to use the vault secret (%s) to decrypt (%s) but it failed. Error: %s' %
(to_text(vault_secret_id), to_text(filename), e))
continue
else:
msg = "Decryption failed (no vault secrets were found that could decrypt)"
@ -877,7 +879,7 @@ class VaultEditor:
# shuffle tmp file into place
self.shuffle_files(tmp_path, filename)
display.vvvvv('Saved edited file "%s" encrypted using %s and vault id "%s"' % (filename, secret, vault_id))
display.vvvvv(u'Saved edited file "%s" encrypted using %s and vault id "%s"' % (to_text(filename), to_text(secret), to_text(vault_id)))
def _real_path(self, filename):
# '-' is special to VaultEditor, dont expand it.
@ -923,7 +925,7 @@ class VaultEditor:
dirname = os.path.dirname(filename)
if dirname and not os.path.exists(dirname):
display.warning("%s does not exist, creating..." % dirname)
display.warning(u"%s does not exist, creating..." % to_text(dirname))
makedirs_safe(dirname)
# FIXME: If we can raise an error here, we can probably just make it
@ -990,8 +992,8 @@ class VaultEditor:
b_vaulttext = self.read_data(filename)
vaulttext = to_text(b_vaulttext)
display.vvvvv('Rekeying file "%s" to with new vault-id "%s" and vault secret %s' %
(filename, new_vault_id, new_vault_secret))
display.vvvvv(u'Rekeying file "%s" to with new vault-id "%s" and vault secret %s' %
(to_text(filename), to_text(new_vault_id), to_text(new_vault_secret)))
try:
plaintext, vault_id_used, _dummy = self.vault.decrypt_and_get_vault_id(vaulttext)
except AnsibleError as e:
@ -1018,8 +1020,8 @@ class VaultEditor:
os.chmod(filename, prev.st_mode)
os.chown(filename, prev.st_uid, prev.st_gid)
display.vvvvv('Rekeyed file "%s" (decrypted with vault id "%s") was encrypted with new vault-id "%s" and vault secret %s' %
(filename, vault_id_used, new_vault_id, new_vault_secret))
display.vvvvv(u'Rekeyed file "%s" (decrypted with vault id "%s") was encrypted with new vault-id "%s" and vault secret %s' %
(to_text(filename), to_text(vault_id_used), to_text(new_vault_id), to_text(new_vault_secret)))
def read_data(self, filename):

@ -8,10 +8,10 @@ ansible --help
ansible testhost -i ../../inventory -m ping "$@"
ansible testhost -i ../../inventory -m setup "$@"
ansible-config -c ./ansible-testé.cfg view | grep 'remote_user = admin'
ansible-config -c ./ansible-testé.cfg dump | grep 'DEFAULT_REMOTE_USER([^)]*) = admin\>'
ansible-config view -c ./ansible-testé.cfg | grep 'remote_user = admin'
ansible-config dump -c ./ansible-testé.cfg | grep 'DEFAULT_REMOTE_USER([^)]*) = admin\>'
ANSIBLE_REMOTE_USER=administrator ansible-config dump| grep 'DEFAULT_REMOTE_USER([^)]*) = administrator\>'
ansible-config list | grep 'DEFAULT_REMOTE_USER'
# 'view' command must fail when config file is missing
ansible-config -c ./ansible-non-existent.cfg view && exit 1 || echo 'Failure is expected'
ansible-config view -c ./ansible-non-existent.cfg && exit 1 || echo 'Failure is expected'

@ -2,4 +2,4 @@
set -eux
ansible-playbook main.yml -i inventory -e "$@"
ansible-playbook main.yml -i inventory "$@"

@ -79,7 +79,7 @@ if [ -x "$(command -v setsid)" ]; then
echo "rc was $WRONG_RC (0 is expected)"
[ $WRONG_RC -eq 0 ]
setsid sh -c 'tty; ansible-vault --ask-vault-pass -vvvvv view test_vault.yml' < /dev/null > log 2>&1 && :
setsid sh -c 'tty; ansible-vault view --ask-vault-pass -vvvvv test_vault.yml' < /dev/null > log 2>&1 && :
WRONG_RC=$?
echo "rc was $WRONG_RC (1 is expected)"
[ $WRONG_RC -eq 1 ]
@ -103,7 +103,7 @@ if [ -x "$(command -v setsid)" ]; then
echo $?
cat log
setsid sh -c 'tty; echo test-vault-password|ansible-vault --ask-vault-pass -vvvvv view vaulted.inventory' < /dev/null > log 2>&1
setsid sh -c 'tty; echo test-vault-password|ansible-vault view --ask-vault-pass -vvvvv vaulted.inventory' < /dev/null > log 2>&1
echo $?
cat log
fi

@ -8,7 +8,7 @@ __metaclass__ = type
import pytest
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
class TestOptparseHelpersVersion:

@ -14,28 +14,19 @@ from ansible.errors import AnsibleOptionsError
def test_parse():
""" Test adhoc parse"""
adhoc_cli = AdHocCLI([])
with pytest.raises(AnsibleOptionsError) as exec_info:
with pytest.raises(SystemExit) as exec_info:
adhoc_cli.parse()
assert "Missing target hosts" == str(exec_info.value)
def test_with_command():
""" Test simple adhoc command"""
module_name = 'command'
adhoc_cli = AdHocCLI(args=['-m', module_name, '-vv'])
adhoc_cli = AdHocCLI(args=['ansible', '-m', module_name, '-vv', 'localhost'])
adhoc_cli.parse()
assert context.CLIARGS['module_name'] == module_name
assert display.verbosity == 2
def test_with_extra_parameters():
""" Test extra parameters"""
adhoc_cli = AdHocCLI(args=['-m', 'command', 'extra_parameters'])
with pytest.raises(AnsibleOptionsError) as exec_info:
adhoc_cli.parse()
assert "Extraneous options or arguments" == str(exec_info.value)
def test_simple_command():
""" Test valid command and its run"""
adhoc_cli = AdHocCLI(['/bin/ansible', '-m', 'command', 'localhost', '-a', 'echo "hi"'])
@ -89,3 +80,10 @@ def test_run_import_playbook():
adhoc_cli.run()
assert context.CLIARGS['module_name'] == import_playbook
assert "'%s' is not a valid action for ad-hoc commands" % import_playbook == str(exec_info.value)
def test_run_no_extra_vars():
adhoc_cli = AdHocCLI(args=['/bin/ansible', 'localhost', '-e'])
with pytest.raises(SystemExit) as exec_info:
adhoc_cli.parse()
assert exec_info.value.code == 2

@ -27,7 +27,7 @@ import tempfile
import yaml
from ansible import context
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.cli.galaxy import GalaxyCLI
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils.six import PY3
@ -128,14 +128,13 @@ class TestGalaxy(unittest.TestCase):
def test_run(self):
''' verifies that the GalaxyCLI object's api is created and that execute() is called. '''
gc = GalaxyCLI(args=["ansible-galaxy", "install", "--ignore-errors", "imaginary_role"])
with patch.object(ansible.cli.CLI, "execute", return_value=None) as mock_ex:
with patch.object(ansible.cli.CLI, "run", return_value=None) as mock_run:
gc.run()
# testing
self.assertIsInstance(gc.galaxy, ansible.galaxy.Galaxy)
self.assertEqual(mock_run.call_count, 1)
self.assertTrue(isinstance(gc.api, ansible.galaxy.api.GalaxyAPI))
self.assertEqual(mock_ex.call_count, 1)
gc.parse()
with patch.object(ansible.cli.CLI, "run", return_value=None) as mock_run:
gc.run()
# testing
self.assertIsInstance(gc.galaxy, ansible.galaxy.Galaxy)
self.assertEqual(mock_run.call_count, 1)
self.assertTrue(isinstance(gc.api, ansible.galaxy.api.GalaxyAPI))
def test_execute_remove(self):
# installing role
@ -172,51 +171,26 @@ class TestGalaxy(unittest.TestCase):
gc.run()
self.assertTrue(mocked_display.called_once_with("- downloading role 'fake_role_name', owned by "))
def run_parse_common(self, galaxycli_obj, action):
with patch.object(opt_help.SortedOptParser, "set_usage") as mocked_usage:
galaxycli_obj.parse()
# checking that the common results of parse() for all possible actions have been created/called
self.assertIsInstance(galaxycli_obj.parser, opt_help.SortedOptParser)
formatted_call = {
'import': 'usage: %prog import [options] github_user github_repo',
'delete': 'usage: %prog delete [options] github_user github_repo',
'info': 'usage: %prog info [options] role_name[,version]',
'init': 'usage: %prog init [options] role_name',
'install': 'usage: %prog install [options] [-r FILE | role_name(s)[,version] | scm+role_repo_url[,version] | tar_file(s)]',
'list': 'usage: %prog list [role_name]',
'login': 'usage: %prog login [options]',
'remove': 'usage: %prog remove role1 role2 ...',
'search': ('usage: %prog search [searchterm1 searchterm2] [--galaxy-tags galaxy_tag1,galaxy_tag2] [--platforms platform1,platform2] '
'[--author username]'),
'setup': 'usage: %prog setup [options] source github_user github_repo secret',
}
first_call = 'usage: %prog [delete|import|info|init|install|list|login|remove|search|setup] [--help] [options] ...'
second_call = formatted_call[action]
calls = [call(first_call), call(second_call)]
mocked_usage.assert_has_calls(calls)
def test_parse_no_action(self):
''' testing the options parser when no action is given '''
gc = GalaxyCLI(args=["ansible-galaxy", ""])
self.assertRaises(AnsibleOptionsError, gc.parse)
self.assertRaises(SystemExit, gc.parse)
def test_parse_invalid_action(self):
''' testing the options parser when an invalid action is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "NOT_ACTION"])
self.assertRaises(AnsibleOptionsError, gc.parse)
self.assertRaises(SystemExit, gc.parse)
def test_parse_delete(self):
''' testing the options parser when the action 'delete' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "delete"])
self.run_parse_common(gc, "delete")
gc = GalaxyCLI(args=["ansible-galaxy", "delete", "foo", "bar"])
gc.parse()
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_import(self):
''' testing the options parser when the action 'import' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "import"])
self.run_parse_common(gc, "import")
gc = GalaxyCLI(args=["ansible-galaxy", "import", "foo", "bar"])
gc.parse()
self.assertEqual(context.CLIARGS['wait'], True)
self.assertEqual(context.CLIARGS['reference'], None)
self.assertEqual(context.CLIARGS['check_status'], False)
@ -224,21 +198,21 @@ class TestGalaxy(unittest.TestCase):
def test_parse_info(self):
''' testing the options parser when the action 'info' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "info"])
self.run_parse_common(gc, "info")
gc = GalaxyCLI(args=["ansible-galaxy", "info", "foo", "bar"])
gc.parse()
self.assertEqual(context.CLIARGS['offline'], False)
def test_parse_init(self):
''' testing the options parser when the action 'init' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "init"])
self.run_parse_common(gc, "init")
gc = GalaxyCLI(args=["ansible-galaxy", "init", "foo"])
gc.parse()
self.assertEqual(context.CLIARGS['offline'], False)
self.assertEqual(context.CLIARGS['force'], False)
def test_parse_install(self):
''' testing the options parser when the action 'install' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "install"])
self.run_parse_common(gc, "install")
gc.parse()
self.assertEqual(context.CLIARGS['ignore_errors'], False)
self.assertEqual(context.CLIARGS['no_deps'], False)
self.assertEqual(context.CLIARGS['role_file'], None)
@ -247,35 +221,34 @@ class TestGalaxy(unittest.TestCase):
def test_parse_list(self):
''' testing the options parser when the action 'list' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "list"])
self.run_parse_common(gc, "list")
gc.parse()
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_login(self):
''' testing the options parser when the action 'login' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "login"])
self.run_parse_common(gc, "login")
gc.parse()
self.assertEqual(context.CLIARGS['verbosity'], 0)
self.assertEqual(context.CLIARGS['token'], None)
def test_parse_remove(self):
''' testing the options parser when the action 'remove' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "remove"])
self.run_parse_common(gc, "remove")
gc = GalaxyCLI(args=["ansible-galaxy", "remove", "foo"])
gc.parse()
self.assertEqual(context.CLIARGS['verbosity'], 0)
def test_parse_search(self):
''' testing the options parswer when the action 'search' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "search"])
self.run_parse_common(gc, "search")
gc.parse()
self.assertEqual(context.CLIARGS['platforms'], None)
self.assertEqual(context.CLIARGS['galaxy_tags'], None)
self.assertEqual(context.CLIARGS['author'], None)
def test_parse_setup(self):
''' testing the options parser when the action 'setup' is given '''
gc = GalaxyCLI(args=["ansible-galaxy", "setup"])
self.run_parse_common(gc, "setup")
gc = GalaxyCLI(args=["ansible-galaxy", "setup", "source", "github_user", "github_repo", "secret"])
gc.parse()
self.assertEqual(context.CLIARGS['verbosity'], 0)
self.assertEqual(context.CLIARGS['remove_id'], None)
self.assertEqual(context.CLIARGS['setup_list'], False)

@ -41,9 +41,8 @@ class TestVaultCli(unittest.TestCase):
def test_parse_empty(self):
cli = VaultCLI([])
self.assertRaisesRegexp(errors.AnsibleOptionsError,
'.*Missing required action.*',
cli.parse)
self.assertRaises(SystemExit,
cli.parse)
# FIXME: something weird seems to be afoot when parsing actions
# cli = VaultCLI(args=['view', '/dev/null/foo', 'mysecret3'])

@ -12,7 +12,7 @@ import pytest
from ansible import constants as C
from ansible import context
from ansible.cli.arguments import optparse_helpers as opt_help
from ansible.cli.arguments import option_helpers as opt_help
from ansible.errors import AnsibleError
from ansible.playbook.play_context import PlayContext
from ansible.playbook.play import Play
@ -45,8 +45,7 @@ def reset_cli_args():
def test_play_context(mocker, parser, reset_cli_args):
(options, args) = parser.parse_args(['-vv', '--check'])
options.args = args
options = parser.parse_args(['-vv', '--check'])
context._init_global_context(options)
play = Play.load({})
play_context = PlayContext(play=play)
@ -97,8 +96,7 @@ def test_play_context(mocker, parser, reset_cli_args):
def test_play_context_make_become_cmd(mocker, parser, reset_cli_args):
(options, args) = parser.parse_args([])
options.args = args
options = parser.parse_args([])
context._init_global_context(options)
play_context = PlayContext()

Loading…
Cancel
Save