@ -9,6 +9,7 @@ import datetime
import json
import pkgutil
import os
import os . path
import re
import textwrap
import traceback
@ -26,12 +27,13 @@ from ansible.module_utils._text import to_native, to_text
from ansible . module_utils . common . _collections_compat import Container , Sequence
from ansible . module_utils . common . json import AnsibleJSONEncoder
from ansible . module_utils . compat import importlib
from ansible . module_utils . six import string_types
from ansible . module_utils . six import iteritems , string_types
from ansible . parsing . dataloader import DataLoader
from ansible . parsing . plugin_docs import read_docstub
from ansible . parsing . utils . yaml import from_yaml
from ansible . parsing . yaml . dumper import AnsibleDumper
from ansible . plugins . loader import action_loader , fragment_loader
from ansible . utils . collection_loader import AnsibleCollectionConfig
from ansible . utils . collection_loader import AnsibleCollectionConfig , AnsibleCollectionRef
from ansible . utils . collection_loader . _collection_finder import _get_collection_name_from_path
from ansible . utils . display import Display
from ansible . utils . plugin_docs import (
@ -44,7 +46,7 @@ from ansible.utils.plugin_docs import (
display = Display ( )
TARGET_OPTIONS = C . DOCUMENTABLE_PLUGINS + ( ' keyword' , )
TARGET_OPTIONS = C . DOCUMENTABLE_PLUGINS + ( ' role' , ' keyword' , )
PB_OBJECTS = [ ' Play ' , ' Role ' , ' Block ' , ' Task ' ]
PB_LOADED = { }
@ -71,7 +73,206 @@ class PluginNotFound(Exception):
pass
class DocCLI ( CLI ) :
class RoleMixin ( object ) :
""" A mixin containing all methods relevant to role argument specification functionality.
Note : The methods for actual display of role data are not present here .
"""
ROLE_ARGSPEC_FILE = ' argument_specs.yml '
def _load_argspec ( self , role_name , collection_path = None , role_path = None ) :
if collection_path :
path = os . path . join ( collection_path , ' roles ' , role_name , ' meta ' , self . ROLE_ARGSPEC_FILE )
elif role_path :
path = os . path . join ( role_path , ' meta ' , self . ROLE_ARGSPEC_FILE )
else :
raise AnsibleError ( " A path is required to load argument specs for role ' %s ' " % role_name )
loader = DataLoader ( )
return loader . load_from_file ( path , cache = False , unsafe = True )
def _find_all_normal_roles ( self , role_paths , name_filters = None ) :
""" Find all non-collection roles that have an argument spec file.
: param role_paths : A tuple of one or more role paths . When a role with the same name
is found in multiple paths , only the first - found role is returned .
: param name_filters : A tuple of one or more role names used to filter the results .
: returns : A set of tuples consisting of : role name , full role path
"""
found = set ( )
found_names = set ( )
for path in role_paths :
if not os . path . isdir ( path ) :
continue
# Check each subdir for a meta/argument_specs.yml file
for entry in os . listdir ( path ) :
role_path = os . path . join ( path , entry )
full_path = os . path . join ( role_path , ' meta ' , self . ROLE_ARGSPEC_FILE )
if os . path . exists ( full_path ) :
if name_filters is None or entry in name_filters :
if entry not in found_names :
found . add ( ( entry , role_path ) )
found_names . add ( entry )
return found
def _find_all_collection_roles ( self , name_filters = None , collection_filter = None ) :
""" Find all collection roles with an argument spec.
: param name_filters : A tuple of one or more role names used to filter the results . These
might be fully qualified with the collection name ( e . g . , community . general . roleA )
or not ( e . g . , roleA ) .
: param collection_filter : A string containing the FQCN of a collection which will be
used to limit results . This filter will take precedence over the name_filters .
: returns : A set of tuples consisting of : role name , collection name , collection path
"""
found = set ( )
b_colldirs = list_collection_dirs ( coll_filter = collection_filter )
for b_path in b_colldirs :
path = to_text ( b_path , errors = ' surrogate_or_strict ' )
collname = _get_collection_name_from_path ( b_path )
roles_dir = os . path . join ( path , ' roles ' )
if os . path . exists ( roles_dir ) :
for entry in os . listdir ( roles_dir ) :
full_path = os . path . join ( roles_dir , entry , ' meta ' , self . ROLE_ARGSPEC_FILE )
if os . path . exists ( full_path ) :
if name_filters is None :
found . add ( ( entry , collname , path ) )
else :
# Name filters might contain a collection FQCN or not.
for fqcn in name_filters :
if len ( fqcn . split ( ' . ' ) ) == 3 :
( ns , col , role ) = fqcn . split ( ' . ' )
if ' . ' . join ( [ ns , col ] ) == collname and entry == role :
found . add ( ( entry , collname , path ) )
elif fqcn == entry :
found . add ( ( entry , collname , path ) )
return found
def _build_summary ( self , role , collection , argspec ) :
""" Build a summary dict for a role.
Returns a simplified role arg spec containing only the role entry points and their
short descriptions , and the role collection name ( if applicable ) .
: param role : The simple role name .
: param collection : The collection containing the role ( None or empty string if N / A ) .
: param argspec : The complete role argspec data dict .
: returns : A tuple with the FQCN role name and a summary dict .
"""
if collection :
fqcn = ' . ' . join ( [ collection , role ] )
else :
fqcn = role
summary = { }
summary [ ' collection ' ] = collection
summary [ ' entry_points ' ] = { }
for entry_point in argspec . keys ( ) :
entry_spec = argspec [ entry_point ] or { }
summary [ ' entry_points ' ] [ entry_point ] = entry_spec . get ( ' short_description ' , ' ' )
return ( fqcn , summary )
def _create_role_list ( self , roles_path , collection_filter = None ) :
""" Return a dict describing the listing of all roles with arg specs.
: param role_paths : A tuple of one or more role paths .
: returns : A dict indexed by role name , with ' collection ' and ' entry_points ' keys per role .
Example return :
results = {
' roleA ' : {
' collection ' : ' ' ,
' entry_points ' : {
' main ' : ' Short description for main '
}
} ,
' a.b.c.roleB ' : {
' collection ' : ' a.b.c ' ,
' entry_points ' : {
' main ' : ' Short description for main ' ,
' alternate ' : ' Short description for alternate entry point '
}
' x.y.z.roleB ' : {
' collection ' : ' x.y.z ' ,
' entry_points ' : {
' main ' : ' Short description for main ' ,
}
} ,
}
"""
if not collection_filter :
roles = self . _find_all_normal_roles ( roles_path )
else :
roles = [ ]
collroles = self . _find_all_collection_roles ( collection_filter = collection_filter )
result = { }
for role , role_path in roles :
argspec = self . _load_argspec ( role , role_path = role_path )
fqcn , summary = self . _build_summary ( role , ' ' , argspec )
result [ fqcn ] = summary
for role , collection , collection_path in collroles :
argspec = self . _load_argspec ( role , collection_path = collection_path )
fqcn , summary = self . _build_summary ( role , collection , argspec )
result [ fqcn ] = summary
return result
def _create_role_doc ( self , role_names , roles_path , entry_point = None ) :
"""
: param role_names : A tuple of one or more role names .
: param role_paths : A tuple of one or more role paths .
: param entry_point : A role entry point name for filtering .
: returns : A dict indexed by role name , with ' collection ' , ' entry_points ' , and ' path ' keys per role .
"""
roles = self . _find_all_normal_roles ( roles_path , name_filters = role_names )
collroles = self . _find_all_collection_roles ( name_filters = role_names )
result = { }
def build_doc ( role , path , collection , argspec ) :
if collection :
fqcn = ' . ' . join ( [ collection , role ] )
else :
fqcn = role
if fqcn not in result :
result [ fqcn ] = { }
doc = { }
doc [ ' path ' ] = path
doc [ ' collection ' ] = collection
doc [ ' entry_points ' ] = { }
for ep in argspec . keys ( ) :
if entry_point is None or ep == entry_point :
entry_spec = argspec [ ep ] or { }
doc [ ' entry_points ' ] [ ep ] = entry_spec
# If we didn't add any entry points (b/c of filtering), remove this entry.
if len ( doc [ ' entry_points ' ] . keys ( ) ) == 0 :
del result [ fqcn ]
else :
result [ fqcn ] = doc
for role , role_path in roles :
argspec = self . _load_argspec ( role , role_path = role_path )
build_doc ( role , role_path , ' ' , argspec )
for role , collection , collection_path in collroles :
argspec = self . _load_argspec ( role , collection_path = collection_path )
build_doc ( role , collection_path , collection , argspec )
return result
class DocCLI ( CLI , RoleMixin ) :
''' displays information on modules installed in Ansible libraries.
It displays a terse listing of plugins and their short descriptions ,
provides a printout of their DOCUMENTATION strings ,
@ -141,6 +342,12 @@ class DocCLI(CLI):
self . parser . add_argument ( " -j " , " --json " , action = " store_true " , default = False , dest = ' json_format ' ,
help = ' Change output into json format. ' )
# role-specific options
self . parser . add_argument ( " -r " , " --roles-path " , dest = ' roles_path ' , default = C . DEFAULT_ROLES_PATH ,
type = opt_help . unfrack_path ( pathsep = True ) ,
action = opt_help . PrependListAction ,
help = ' The path to the directory containing your roles. ' )
exclusive = self . parser . add_mutually_exclusive_group ( )
exclusive . add_argument ( " -F " , " --list_files " , action = " store_true " , default = False , dest = " list_files " ,
help = ' Show plugin names and their source files without summaries (implies --list). %s ' % coll_filter )
@ -150,6 +357,8 @@ class DocCLI(CLI):
help = ' Show playbook snippet for specified plugin(s) ' )
exclusive . add_argument ( " --metadata-dump " , action = " store_true " , default = False , dest = ' dump ' ,
help = ' **For internal testing only** Dump json metadata for all plugins. ' )
exclusive . add_argument ( " -e " , " --entry-point " , dest = " entry_point " ,
help = " Select the entry point for role(s). " )
def post_process_args ( self , options ) :
options = super ( DocCLI , self ) . post_process_args ( options )
@ -192,6 +401,48 @@ class DocCLI(CLI):
# display results
DocCLI . pager ( " \n " . join ( text ) )
def _display_available_roles ( self , list_json ) :
""" Display all roles we can find with a valid argument specification.
Output is : fqcn role name , entry point , short description
"""
roles = list ( list_json . keys ( ) )
entry_point_names = set ( )
for role in roles :
for entry_point in list_json [ role ] [ ' entry_points ' ] . keys ( ) :
entry_point_names . add ( entry_point )
max_role_len = 0
max_ep_len = 0
if roles :
max_role_len = max ( len ( x ) for x in roles )
if entry_point_names :
max_ep_len = max ( len ( x ) for x in entry_point_names )
linelimit = display . columns - max_role_len - max_ep_len - 5
text = [ ]
for role in sorted ( roles ) :
for entry_point , desc in iteritems ( list_json [ role ] [ ' entry_points ' ] ) :
if len ( desc ) > linelimit :
desc = desc [ : linelimit ] + ' ... '
text . append ( " %-*s %-*s %s " % ( max_role_len , role ,
max_ep_len , entry_point ,
desc ) )
# display results
DocCLI . pager ( " \n " . join ( text ) )
def _display_role_doc ( self , role_json ) :
roles = list ( role_json . keys ( ) )
text = [ ]
for role in roles :
text + = self . get_role_man_text ( role , role_json [ role ] )
# display results
DocCLI . pager ( " \n " . join ( text ) )
@staticmethod
def _list_keywords ( ) :
return from_yaml ( pkgutil . get_data ( ' ansible ' , ' keyword_desc.yml ' ) )
@ -315,11 +566,27 @@ class DocCLI(CLI):
super ( DocCLI , self ) . run ( )
basedir = context . CLIARGS [ ' basedir ' ]
plugin_type = context . CLIARGS [ ' type ' ]
do_json = context . CLIARGS [ ' json_format ' ]
roles_path = context . CLIARGS [ ' roles_path ' ]
listing = context . CLIARGS [ ' list_files ' ] or context . CLIARGS [ ' list_dir ' ] or context . CLIARGS [ ' dump ' ]
docs = { }
if basedir :
AnsibleCollectionConfig . playbook_paths = basedir
# Add any 'roles' subdir in playbook dir to the roles search path.
# And as a last resort, add the playbook dir itself. Order being:
# - 'roles' subdir of playbook dir
# - DEFAULT_ROLES_PATH
# - playbook dir
# NOTE: This matches logic in RoleDefinition._load_role_path() method.
subdir = os . path . join ( basedir , " roles " )
if os . path . isdir ( subdir ) :
roles_path = ( subdir , ) + roles_path
roles_path = roles_path + ( basedir , )
if plugin_type not in TARGET_OPTIONS :
raise AnsibleOptionsError ( " Unknown or undocumentable plugin type: %s " % plugin_type )
elif plugin_type == ' keyword ' :
@ -328,6 +595,20 @@ class DocCLI(CLI):
docs = DocCLI . _list_keywords ( )
else :
docs = DocCLI . _get_keywords_docs ( context . CLIARGS [ ' args ' ] )
elif plugin_type == ' role ' :
if context . CLIARGS [ ' list_dir ' ] :
# If an argument was given with --list, it is a collection filter
coll_filter = None
if len ( context . CLIARGS [ ' args ' ] ) == 1 :
coll_filter = context . CLIARGS [ ' args ' ] [ 0 ]
if not AnsibleCollectionRef . is_valid_collection_name ( coll_filter ) :
raise AnsibleError ( ' Invalid collection name (must be of the form namespace.collection): {0} ' . format ( coll_filter ) )
elif len ( context . CLIARGS [ ' args ' ] ) > 1 :
raise AnsibleOptionsError ( " Only a single collection filter is supported. " )
docs = self . _create_role_list ( roles_path , collection_filter = coll_filter )
else :
docs = self . _create_role_doc ( context . CLIARGS [ ' args ' ] , roles_path , context . CLIARGS [ ' entry_point ' ] )
else :
loader = getattr ( plugin_loader , ' %s _loader ' % plugin_type )
@ -367,6 +648,11 @@ class DocCLI(CLI):
text . append ( textret )
else :
display . warning ( " No valid documentation was retrieved from ' %s ' " % plugin )
elif plugin_type == ' role ' :
if context . CLIARGS [ ' list_dir ' ] and docs :
self . _display_available_roles ( docs )
elif docs :
self . _display_role_doc ( docs )
elif docs :
text = DocCLI . _dump_yaml ( docs , ' ' )
@ -713,6 +999,62 @@ class DocCLI(CLI):
if not suboptions :
text . append ( ' ' )
def get_role_man_text ( self , role , role_json ) :
''' Generate text for the supplied role suitable for display.
This is similar to get_man_text ( ) , but roles are different enough that we have
a separate method for formatting their display .
: param role : The role name .
: param role_json : The JSON for the given role as returned from _create_role_doc ( ) .
: returns : A array of text suitable for displaying to screen .
'''
text = [ ]
opt_indent = " "
pad = display . columns * 0.20
limit = max ( display . columns - int ( pad ) , 70 )
text . append ( " > %s ( %s ) \n " % ( role . upper ( ) , role_json . get ( ' path ' ) ) )
for entry_point in role_json [ ' entry_points ' ] :
doc = role_json [ ' entry_points ' ] [ entry_point ]
if doc . get ( ' short_description ' ) :
text . append ( " ENTRY POINT: %s - %s \n " % ( entry_point , doc . get ( ' short_description ' ) ) )
else :
text . append ( " ENTRY POINT: %s \n " % entry_point )
if doc . get ( ' description ' ) :
if isinstance ( doc [ ' description ' ] , list ) :
desc = " " . join ( doc [ ' description ' ] )
else :
desc = doc [ ' description ' ]
text . append ( " %s \n " % textwrap . fill ( DocCLI . tty_ify ( desc ) ,
limit , initial_indent = opt_indent ,
subsequent_indent = opt_indent ) )
if doc . get ( ' options ' ) :
text . append ( " OPTIONS (= is mandatory): \n " )
DocCLI . add_fields ( text , doc . pop ( ' options ' ) , limit , opt_indent )
text . append ( ' ' )
# generic elements we will handle identically
for k in ( ' author ' , ) :
if k not in doc :
continue
if isinstance ( doc [ k ] , string_types ) :
text . append ( ' %s : %s ' % ( k . upper ( ) , textwrap . fill ( DocCLI . tty_ify ( doc [ k ] ) ,
limit - ( len ( k ) + 2 ) , subsequent_indent = opt_indent ) ) )
elif isinstance ( doc [ k ] , ( list , tuple ) ) :
text . append ( ' %s : %s ' % ( k . upper ( ) , ' , ' . join ( doc [ k ] ) ) )
else :
# use empty indent since this affects the start of the yaml doc, not it's keys
text . append ( DocCLI . _dump_yaml ( { k . upper ( ) : doc [ k ] } , ' ' ) )
text . append ( ' ' )
return text
@staticmethod
def get_man_text ( doc , collection_name = ' ' , plugin_type = ' ' ) :
# Create a copy so we don't modify the original