ansible/executor: PEP8 compliancy (#24695)

- Make PEP8 compliant
pull/25219/head
Dag Wieers 8 years ago committed by John R Barker
parent 51b595992b
commit 630185cb20

@ -18,4 +18,3 @@
# Make coding more python3-ish # Make coding more python3-ish
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type

@ -40,4 +40,3 @@ if 'action_write_locks' not in globals():
mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) mods.update(('copy', 'file', 'setup', 'slurp', 'stat'))
for mod_name in mods: for mod_name in mods:
action_write_locks[mod_name] = Lock() action_write_locks[mod_name] = Lock()

@ -50,12 +50,12 @@ except ImportError:
display = Display() display = Display()
REPLACER = b"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>" REPLACER = b"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>"
REPLACER_VERSION = b"\"<<ANSIBLE_VERSION>>\"" REPLACER_VERSION = b"\"<<ANSIBLE_VERSION>>\""
REPLACER_COMPLEX = b"\"<<INCLUDE_ANSIBLE_MODULE_COMPLEX_ARGS>>\"" REPLACER_COMPLEX = b"\"<<INCLUDE_ANSIBLE_MODULE_COMPLEX_ARGS>>\""
REPLACER_WINDOWS = b"# POWERSHELL_COMMON" REPLACER_WINDOWS = b"# POWERSHELL_COMMON"
REPLACER_JSONARGS = b"<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>" REPLACER_JSONARGS = b"<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>"
REPLACER_SELINUX = b"<<SELINUX_SPECIAL_FILESYSTEMS>>" REPLACER_SELINUX = b"<<SELINUX_SPECIAL_FILESYSTEMS>>"
# We could end up writing out parameters with unicode characters so we need to # We could end up writing out parameters with unicode characters so we need to
# specify an encoding for the python source file # specify an encoding for the python source file
@ -502,7 +502,7 @@ def recursive_finder(name, data, py_module_names, py_module_cache, zf):
break break
try: try:
module_info = imp.find_module(py_module_name[-idx], module_info = imp.find_module(py_module_name[-idx],
[os.path.join(p, *py_module_name[:-idx]) for p in module_utils_paths]) [os.path.join(p, *py_module_name[:-idx]) for p in module_utils_paths])
break break
except ImportError: except ImportError:
continue continue
@ -561,7 +561,7 @@ def recursive_finder(name, data, py_module_names, py_module_cache, zf):
py_pkg_name = py_module_name[:-i] + ('__init__',) py_pkg_name = py_module_name[:-i] + ('__init__',)
if py_pkg_name not in py_module_names: if py_pkg_name not in py_module_names:
pkg_dir_info = imp.find_module(py_pkg_name[-1], pkg_dir_info = imp.find_module(py_pkg_name[-1],
[os.path.join(p, *py_pkg_name[:-1]) for p in module_utils_paths]) [os.path.join(p, *py_pkg_name[:-1]) for p in module_utils_paths])
normalized_modules.add(py_pkg_name) normalized_modules.add(py_pkg_name)
py_module_cache[py_pkg_name] = (_slurp(pkg_dir_info[1]), pkg_dir_info[1]) py_module_cache[py_pkg_name] = (_slurp(pkg_dir_info[1]), pkg_dir_info[1])
@ -578,7 +578,7 @@ def recursive_finder(name, data, py_module_names, py_module_cache, zf):
py_module_file_name = '%s.py' % py_module_path py_module_file_name = '%s.py' % py_module_path
zf.writestr(os.path.join("ansible/module_utils", zf.writestr(os.path.join("ansible/module_utils",
py_module_file_name), py_module_cache[py_module_name][0]) py_module_file_name), py_module_cache[py_module_name][0])
display.vvv("Using module_utils file %s" % py_module_cache[py_module_name][1]) display.vvv("Using module_utils file %s" % py_module_cache[py_module_name][1])
# Add the names of the files we're scheduling to examine in the loop to # Add the names of the files we're scheduling to examine in the loop to
@ -683,14 +683,14 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
# Note: If we need to import from release.py first, # Note: If we need to import from release.py first,
# remember to catch all exceptions: https://github.com/ansible/ansible/issues/16523 # remember to catch all exceptions: https://github.com/ansible/ansible/issues/16523
zf.writestr('ansible/__init__.py', zf.writestr('ansible/__init__.py',
b'from pkgutil import extend_path\n__path__=extend_path(__path__,__name__)\n__version__="' + b'from pkgutil import extend_path\n__path__=extend_path(__path__,__name__)\n__version__="' +
to_bytes(__version__) + b'"\n__author__="' + to_bytes(__version__) + b'"\n__author__="' +
to_bytes(__author__) + b'"\n') to_bytes(__author__) + b'"\n')
zf.writestr('ansible/module_utils/__init__.py', b'from pkgutil import extend_path\n__path__=extend_path(__path__,__name__)\n') zf.writestr('ansible/module_utils/__init__.py', b'from pkgutil import extend_path\n__path__=extend_path(__path__,__name__)\n')
zf.writestr('ansible_module_%s.py' % module_name, b_module_data) zf.writestr('ansible_module_%s.py' % module_name, b_module_data)
py_module_cache = { ('__init__',): (b'', '[builtin]') } py_module_cache = {('__init__',): (b'', '[builtin]')}
recursive_finder(module_name, b_module_data, py_module_names, py_module_cache, zf) recursive_finder(module_name, b_module_data, py_module_names, py_module_cache, zf)
zf.close() zf.close()
zipdata = base64.b64encode(zipoutput.getvalue()) zipdata = base64.b64encode(zipoutput.getvalue())
@ -721,8 +721,8 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
try: try:
zipdata = open(cached_module_filename, 'rb').read() zipdata = open(cached_module_filename, 'rb').read()
except IOError: except IOError:
raise AnsibleError('A different worker process failed to create module file.' raise AnsibleError('A different worker process failed to create module file. '
' Look at traceback for that process for debugging information.') 'Look at traceback for that process for debugging information.')
zipdata = to_text(zipdata, errors='surrogate_or_strict') zipdata = to_text(zipdata, errors='surrogate_or_strict')
shebang, interpreter = _get_shebang(u'/usr/bin/python', task_vars) shebang, interpreter = _get_shebang(u'/usr/bin/python', task_vars)
@ -734,7 +734,7 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
interpreter_parts = interpreter.split(u' ') interpreter_parts = interpreter.split(u' ')
interpreter = u"'{0}'".format(u"', '".join(interpreter_parts)) interpreter = u"'{0}'".format(u"', '".join(interpreter_parts))
now=datetime.datetime.utcnow() now = datetime.datetime.utcnow()
output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict( output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict(
zipdata=zipdata, zipdata=zipdata,
ansible_module=module_name, ansible_module=module_name,
@ -837,6 +837,7 @@ def modify_module(module_name, module_path, module_args, task_vars=dict(), modul
return (b_module_data, module_style, to_text(shebang, nonstring='passthru')) return (b_module_data, module_style, to_text(shebang, nonstring='passthru'))
def build_windows_module_payload(module_name, module_path, b_module_data, module_args, task_vars, task, play_context, environment): def build_windows_module_payload(module_name, module_path, b_module_data, module_args, task_vars, task, play_context, environment):
exec_manifest = dict( exec_manifest = dict(
module_entry=to_text(base64.b64encode(b_module_data)), module_entry=to_text(base64.b64encode(b_module_data)),
@ -856,7 +857,7 @@ def build_windows_module_payload(module_name, module_path, b_module_data, module
exec_manifest["async_jid"] = str(random.randint(0, 999999999999)) exec_manifest["async_jid"] = str(random.randint(0, 999999999999))
exec_manifest["async_timeout_sec"] = task.async exec_manifest["async_timeout_sec"] = task.async
if play_context.become and play_context.become_method=='runas': if play_context.become and play_context.become_method == 'runas':
exec_manifest["actions"].insert(0, 'become') exec_manifest["actions"].insert(0, 'become')
exec_manifest["become_user"] = play_context.become_user exec_manifest["become_user"] = play_context.become_user
exec_manifest["become_password"] = play_context.become_pass exec_manifest["become_password"] = play_context.become_pass

@ -40,21 +40,21 @@ except ImportError:
class HostState: class HostState:
def __init__(self, blocks): def __init__(self, blocks):
self._blocks = blocks[:] self._blocks = blocks[:]
self.cur_block = 0 self.cur_block = 0
self.cur_regular_task = 0 self.cur_regular_task = 0
self.cur_rescue_task = 0 self.cur_rescue_task = 0
self.cur_always_task = 0 self.cur_always_task = 0
self.cur_dep_chain = None self.cur_dep_chain = None
self.run_state = PlayIterator.ITERATING_SETUP self.run_state = PlayIterator.ITERATING_SETUP
self.fail_state = PlayIterator.FAILED_NONE self.fail_state = PlayIterator.FAILED_NONE
self.pending_setup = False self.pending_setup = False
self.tasks_child_state = None self.tasks_child_state = None
self.rescue_child_state = None self.rescue_child_state = None
self.always_child_state = None self.always_child_state = None
self.did_rescue = False self.did_rescue = False
self.did_start_at_task = False self.did_start_at_task = False
def __repr__(self): def __repr__(self):
return "HostState(%r)" % self._blocks return "HostState(%r)" % self._blocks
@ -68,7 +68,7 @@ class HostState:
return "UNKNOWN STATE" return "UNKNOWN STATE"
def _failed_state_to_string(n): def _failed_state_to_string(n):
states = {1:"FAILED_SETUP", 2:"FAILED_TASKS", 4:"FAILED_RESCUE", 8:"FAILED_ALWAYS"} states = {1: "FAILED_SETUP", 2: "FAILED_TASKS", 4: "FAILED_RESCUE", 8: "FAILED_ALWAYS"}
if n == 0: if n == 0:
return "FAILED_NONE" return "FAILED_NONE"
else: else:
@ -130,22 +130,23 @@ class HostState:
new_state.always_child_state = self.always_child_state.copy() new_state.always_child_state = self.always_child_state.copy()
return new_state return new_state
class PlayIterator: class PlayIterator:
# the primary running states for the play iteration # the primary running states for the play iteration
ITERATING_SETUP = 0 ITERATING_SETUP = 0
ITERATING_TASKS = 1 ITERATING_TASKS = 1
ITERATING_RESCUE = 2 ITERATING_RESCUE = 2
ITERATING_ALWAYS = 3 ITERATING_ALWAYS = 3
ITERATING_COMPLETE = 4 ITERATING_COMPLETE = 4
# the failure states for the play iteration, which are powers # the failure states for the play iteration, which are powers
# of 2 as they may be or'ed together in certain circumstances # of 2 as they may be or'ed together in certain circumstances
FAILED_NONE = 0 FAILED_NONE = 0
FAILED_SETUP = 1 FAILED_SETUP = 1
FAILED_TASKS = 2 FAILED_TASKS = 2
FAILED_RESCUE = 4 FAILED_RESCUE = 4
FAILED_ALWAYS = 8 FAILED_ALWAYS = 8
def __init__(self, inventory, play, play_context, variable_manager, all_vars, start_at_done=False): def __init__(self, inventory, play, play_context, variable_manager, all_vars, start_at_done=False):
self._play = play self._play = play
@ -173,8 +174,8 @@ class PlayIterator:
setup_task = Task(block=setup_block) setup_task = Task(block=setup_block)
setup_task.action = 'setup' setup_task.action = 'setup'
setup_task.name = 'Gathering Facts' setup_task.name = 'Gathering Facts'
setup_task.tags = ['always'] setup_task.tags = ['always']
setup_task.args = { setup_task.args = {
'gather_subset': gather_subset, 'gather_subset': gather_subset,
} }
if gather_timeout: if gather_timeout:
@ -270,7 +271,6 @@ class PlayIterator:
display.debug(" ^ state is: %s" % s) display.debug(" ^ state is: %s" % s)
return (s, task) return (s, task)
def _get_next_task_from_state(self, state, host, peek, in_child=False): def _get_next_task_from_state(self, state, host, peek, in_child=False):
task = None task = None
@ -304,7 +304,7 @@ class PlayIterator:
if (gathering == 'implicit' and implied) or \ if (gathering == 'implicit' and implied) or \
(gathering == 'explicit' and boolean(self._play.gather_facts)) or \ (gathering == 'explicit' and boolean(self._play.gather_facts)) or \
(gathering == 'smart' and implied and not (self._variable_manager._fact_cache.get(host.name,{}).get('module_setup', False))): (gathering == 'smart' and implied and not (self._variable_manager._fact_cache.get(host.name, {}).get('module_setup', False))):
# The setup block is always self._blocks[0], as we inject it # The setup block is always self._blocks[0], as we inject it
# during the play compilation in __init__ above. # during the play compilation in __init__ above.
setup_block = self._blocks[0] setup_block = self._blocks[0]
@ -320,8 +320,8 @@ class PlayIterator:
if not state.did_start_at_task: if not state.did_start_at_task:
state.cur_block += 1 state.cur_block += 1
state.cur_regular_task = 0 state.cur_regular_task = 0
state.cur_rescue_task = 0 state.cur_rescue_task = 0
state.cur_always_task = 0 state.cur_always_task = 0
state.child_state = None state.child_state = None
elif state.run_state == self.ITERATING_TASKS: elif state.run_state == self.ITERATING_TASKS:
@ -416,8 +416,8 @@ class PlayIterator:
else: else:
state.cur_block += 1 state.cur_block += 1
state.cur_regular_task = 0 state.cur_regular_task = 0
state.cur_rescue_task = 0 state.cur_rescue_task = 0
state.cur_always_task = 0 state.cur_always_task = 0
state.run_state = self.ITERATING_TASKS state.run_state = self.ITERATING_TASKS
state.tasks_child_state = None state.tasks_child_state = None
state.rescue_child_state = None state.rescue_child_state = None
@ -496,9 +496,9 @@ class PlayIterator:
elif state.run_state == self.ITERATING_ALWAYS and self._check_failed_state(state.always_child_state): elif state.run_state == self.ITERATING_ALWAYS and self._check_failed_state(state.always_child_state):
return True return True
elif state.fail_state != self.FAILED_NONE: elif state.fail_state != self.FAILED_NONE:
if state.run_state == self.ITERATING_RESCUE and state.fail_state&self.FAILED_RESCUE == 0: if state.run_state == self.ITERATING_RESCUE and state.fail_state & self.FAILED_RESCUE == 0:
return False return False
elif state.run_state == self.ITERATING_ALWAYS and state.fail_state&self.FAILED_ALWAYS == 0: elif state.run_state == self.ITERATING_ALWAYS and state.fail_state & self.FAILED_ALWAYS == 0:
return False return False
else: else:
return not state.did_rescue return not state.did_rescue
@ -540,7 +540,7 @@ class PlayIterator:
else: else:
target_block = state._blocks[state.cur_block].copy(exclude_parent=True) target_block = state._blocks[state.cur_block].copy(exclude_parent=True)
before = target_block.block[:state.cur_regular_task] before = target_block.block[:state.cur_regular_task]
after = target_block.block[state.cur_regular_task:] after = target_block.block[state.cur_regular_task:]
target_block.block = before + task_list + after target_block.block = before + task_list + after
state._blocks[state.cur_block] = target_block state._blocks[state.cur_block] = target_block
elif state.run_state == self.ITERATING_RESCUE: elif state.run_state == self.ITERATING_RESCUE:
@ -549,7 +549,7 @@ class PlayIterator:
else: else:
target_block = state._blocks[state.cur_block].copy(exclude_parent=True) target_block = state._blocks[state.cur_block].copy(exclude_parent=True)
before = target_block.rescue[:state.cur_rescue_task] before = target_block.rescue[:state.cur_rescue_task]
after = target_block.rescue[state.cur_rescue_task:] after = target_block.rescue[state.cur_rescue_task:]
target_block.rescue = before + task_list + after target_block.rescue = before + task_list + after
state._blocks[state.cur_block] = target_block state._blocks[state.cur_block] = target_block
elif state.run_state == self.ITERATING_ALWAYS: elif state.run_state == self.ITERATING_ALWAYS:
@ -558,7 +558,7 @@ class PlayIterator:
else: else:
target_block = state._blocks[state.cur_block].copy(exclude_parent=True) target_block = state._blocks[state.cur_block].copy(exclude_parent=True)
before = target_block.always[:state.cur_always_task] before = target_block.always[:state.cur_always_task]
after = target_block.always[state.cur_always_task:] after = target_block.always[state.cur_always_task:]
target_block.always = before + task_list + after target_block.always = before + task_list + after
state._blocks[state.cur_block] = target_block state._blocks[state.cur_block] = target_block
return state return state
@ -567,4 +567,3 @@ class PlayIterator:
for b in task_list: for b in task_list:
self.cache_block_tasks(b) self.cache_block_tasks(b)
self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list) self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list)

@ -45,12 +45,12 @@ class PlaybookExecutor:
''' '''
def __init__(self, playbooks, inventory, variable_manager, loader, options, passwords): def __init__(self, playbooks, inventory, variable_manager, loader, options, passwords):
self._playbooks = playbooks self._playbooks = playbooks
self._inventory = inventory self._inventory = inventory
self._variable_manager = variable_manager self._variable_manager = variable_manager
self._loader = loader self._loader = loader
self._options = options self._options = options
self.passwords = passwords self.passwords = passwords
self._unreachable_hosts = dict() self._unreachable_hosts = dict()
if options.listhosts or options.listtasks or options.listtags or options.syntax: if options.listhosts or options.listtasks or options.listtags or options.syntax:
@ -79,7 +79,7 @@ class PlaybookExecutor:
try: try:
for playbook_path in self._playbooks: for playbook_path in self._playbooks:
pb = Playbook.load(playbook_path, variable_manager=self._variable_manager, loader=self._loader) pb = Playbook.load(playbook_path, variable_manager=self._variable_manager, loader=self._loader)
#FIXME: move out of inventory self._inventory.set_playbook_basedir(os.path.realpath(os.path.dirname(playbook_path))) # FIXME: move out of inventory self._inventory.set_playbook_basedir(os.path.realpath(os.path.dirname(playbook_path)))
if self._tqm is None: # we are doing a listing if self._tqm is None: # we are doing a listing
entry = {'playbook': playbook_path} entry = {'playbook': playbook_path}
@ -104,14 +104,14 @@ class PlaybookExecutor:
if play.vars_prompt: if play.vars_prompt:
for var in play.vars_prompt: for var in play.vars_prompt:
vname = var['name'] vname = var['name']
prompt = var.get("prompt", vname) prompt = var.get("prompt", vname)
default = var.get("default", None) default = var.get("default", None)
private = var.get("private", True) private = var.get("private", True)
confirm = var.get("confirm", False) confirm = var.get("confirm", False)
encrypt = var.get("encrypt", None) encrypt = var.get("encrypt", None)
salt_size = var.get("salt_size", None) salt_size = var.get("salt_size", None)
salt = var.get("salt", None) salt = var.get("salt", None)
if vname not in self._variable_manager.extra_vars: if vname not in self._variable_manager.extra_vars:
if self._tqm: if self._tqm:

@ -18,4 +18,3 @@
# Make coding more python3-ish # Make coding more python3-ish
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type

@ -28,11 +28,11 @@ from jinja2.exceptions import TemplateNotFound
# TODO: not needed if we use the cryptography library with its default RNG # TODO: not needed if we use the cryptography library with its default RNG
# engine # engine
HAS_ATFORK=True HAS_ATFORK = True
try: try:
from Crypto.Random import atfork from Crypto.Random import atfork
except ImportError: except ImportError:
HAS_ATFORK=False HAS_ATFORK = False
from ansible.errors import AnsibleConnectionFailure from ansible.errors import AnsibleConnectionFailure
from ansible.executor.task_executor import TaskExecutor from ansible.executor.task_executor import TaskExecutor
@ -59,13 +59,13 @@ class WorkerProcess(multiprocessing.Process):
super(WorkerProcess, self).__init__() super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param: # takes a task queue manager as the sole param:
self._rslt_q = rslt_q self._rslt_q = rslt_q
self._task_vars = task_vars self._task_vars = task_vars
self._host = host self._host = host
self._task = task self._task = task
self._play_context = play_context self._play_context = play_context
self._loader = loader self._loader = loader
self._variable_manager = variable_manager self._variable_manager = variable_manager
self._shared_loader_obj = shared_loader_obj self._shared_loader_obj = shared_loader_obj
if sys.stdin.isatty(): if sys.stdin.isatty():
@ -95,9 +95,9 @@ class WorkerProcess(multiprocessing.Process):
signify that they are ready for their next task. signify that they are ready for their next task.
''' '''
#import cProfile, pstats, StringIO # import cProfile, pstats, StringIO
#pr = cProfile.Profile() # pr = cProfile.Profile()
#pr.enable() # pr.enable()
if HAS_ATFORK: if HAS_ATFORK:
atfork() atfork()
@ -160,11 +160,10 @@ class WorkerProcess(multiprocessing.Process):
display.debug("WORKER PROCESS EXITING") display.debug("WORKER PROCESS EXITING")
#pr.disable() # pr.disable()
#s = StringIO.StringIO() # s = StringIO.StringIO()
#sortby = 'time' # sortby = 'time'
#ps = pstats.Stats(pr, stream=s).sort_stats(sortby) # ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
#ps.print_stats() # ps.print_stats()
#with open('worker_%06d.stats' % os.getpid(), 'w') as f: # with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue()) # f.write(s.getvalue())

@ -23,17 +23,18 @@ from collections import MutableMapping
from ansible.utils.vars import merge_hash from ansible.utils.vars import merge_hash
class AggregateStats: class AggregateStats:
''' holds stats about per-host activity during playbook runs ''' ''' holds stats about per-host activity during playbook runs '''
def __init__(self): def __init__(self):
self.processed = {} self.processed = {}
self.failures = {} self.failures = {}
self.ok = {} self.ok = {}
self.dark = {} self.dark = {}
self.changed = {} self.changed = {}
self.skipped = {} self.skipped = {}
# user defined stats, which can be per host or global # user defined stats, which can be per host or global
self.custom = {} self.custom = {}
@ -43,17 +44,17 @@ class AggregateStats:
self.processed[host] = 1 self.processed[host] = 1
prev = (getattr(self, what)).get(host, 0) prev = (getattr(self, what)).get(host, 0)
getattr(self, what)[host] = prev+1 getattr(self, what)[host] = prev + 1
def summarize(self, host): def summarize(self, host):
''' return information about a particular host ''' ''' return information about a particular host '''
return dict( return dict(
ok = self.ok.get(host, 0), ok=self.ok.get(host, 0),
failures = self.failures.get(host, 0), failures=self.failures.get(host, 0),
unreachable = self.dark.get(host,0), unreachable=self.dark.get(host, 0),
changed = self.changed.get(host, 0), changed=self.changed.get(host, 0),
skipped = self.skipped.get(host, 0) skipped=self.skipped.get(host, 0),
) )
def set_custom_stats(self, which, what, host=None): def set_custom_stats(self, which, what, host=None):
@ -79,8 +80,7 @@ class AggregateStats:
return None return None
if isinstance(what, MutableMapping): if isinstance(what, MutableMapping):
self.custom[host][which] = merge_hash(self.custom[host][which], what) self.custom[host][which] = merge_hash(self.custom[host][which], what)
else: else:
# let overloaded + take care of other types # let overloaded + take care of other types
self.custom[host][which] += what self.custom[host][which] += what

@ -61,16 +61,16 @@ class TaskExecutor:
SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS) SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS)
def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q): def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q):
self._host = host self._host = host
self._task = task self._task = task
self._job_vars = job_vars self._job_vars = job_vars
self._play_context = play_context self._play_context = play_context
self._new_stdin = new_stdin self._new_stdin = new_stdin
self._loader = loader self._loader = loader
self._shared_loader_obj = shared_loader_obj self._shared_loader_obj = shared_loader_obj
self._connection = None self._connection = None
self._rslt_q = rslt_q self._rslt_q = rslt_q
self._loop_eval_error = None self._loop_eval_error = None
self._task.squash() self._task.squash()
@ -99,7 +99,7 @@ class TaskExecutor:
# loop through the item results, and remember the changed/failed # loop through the item results, and remember the changed/failed
# result flags based on any item there. # result flags based on any item there.
changed = False changed = False
failed = False failed = False
for item in item_results: for item in item_results:
if 'changed' in item and item['changed']: if 'changed' in item and item['changed']:
changed = True changed = True
@ -148,7 +148,7 @@ class TaskExecutor:
else: else:
raise raise
elif isinstance(res, list): elif isinstance(res, list):
for idx,item in enumerate(res): for (idx, item) in enumerate(res):
res[idx] = _clean_res(item, errors=errors) res[idx] = _clean_res(item, errors=errors)
return res return res
@ -189,7 +189,6 @@ class TaskExecutor:
# get search path for this task to pass to lookup plugins # get search path for this task to pass to lookup plugins
self._job_vars['ansible_search_path'] = self._task.get_search_path() self._job_vars['ansible_search_path'] = self._task.get_search_path()
templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars) templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
items = None items = None
if self._task.loop: if self._task.loop:
@ -211,7 +210,7 @@ class TaskExecutor:
for subdir in ['template', 'var', 'file']: # TODO: move this to constants? for subdir in ['template', 'var', 'file']: # TODO: move this to constants?
if subdir in self._task.action: if subdir in self._task.action:
break break
setattr(mylookup,'_subdir', subdir + 's') setattr(mylookup, '_subdir', subdir + 's')
# run lookup # run lookup
items = mylookup.run(terms=loop_terms, variables=self._job_vars, wantlist=True) items = mylookup.run(terms=loop_terms, variables=self._job_vars, wantlist=True)
@ -249,7 +248,7 @@ class TaskExecutor:
# make copies of the job vars and task so we can add the item to # make copies of the job vars and task so we can add the item to
# the variables and re-validate the task with the item variable # the variables and re-validate the task with the item variable
#task_vars = self._job_vars.copy() # task_vars = self._job_vars.copy()
task_vars = self._job_vars task_vars = self._job_vars
loop_var = 'item' loop_var = 'item'
@ -263,8 +262,8 @@ class TaskExecutor:
if loop_var in task_vars: if loop_var in task_vars:
display.warning(u"The loop variable '%s' is already in use. " display.warning(u"The loop variable '%s' is already in use. "
u"You should set the `loop_var` value in the `loop_control` option for the task" u"You should set the `loop_var` value in the `loop_control` option for the task"
u" to something else to avoid variable collisions and unexpected behavior." % loop_var) u" to something else to avoid variable collisions and unexpected behavior." % loop_var)
ran_once = False ran_once = False
items = self._squash_items(items, loop_var, task_vars) items = self._squash_items(items, loop_var, task_vars)
@ -369,7 +368,7 @@ class TaskExecutor:
else: else:
# Restore the name parameter # Restore the name parameter
self._task.args['name'] = name self._task.args['name'] = name
#elif: # elif:
# Right now we only optimize single entries. In the future we # Right now we only optimize single entries. In the future we
# could optimize more types: # could optimize more types:
# * lists can be squashed together # * lists can be squashed together
@ -544,7 +543,7 @@ class TaskExecutor:
if self._task.async > 0: if self._task.async > 0:
if self._task.poll > 0 and not result.get('skipped') and not result.get('failed'): if self._task.poll > 0 and not result.get('skipped') and not result.get('failed'):
result = self._poll_async_result(result=result, templar=templar, task_vars=vars_copy) result = self._poll_async_result(result=result, templar=templar, task_vars=vars_copy)
#FIXME callback 'v2_runner_on_async_poll' here # FIXME callback 'v2_runner_on_async_poll' here
# ensure no log is preserved # ensure no log is preserved
result["_ansible_no_log"] = self._play_context.no_log result["_ansible_no_log"] = self._play_context.no_log
@ -651,7 +650,7 @@ class TaskExecutor:
async_task = Task().load(dict(action='async_status jid=%s' % async_jid)) async_task = Task().load(dict(action='async_status jid=%s' % async_jid))
#FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized # FIXME: this is no longer the case, normal takes care of all, see if this can just be generalized
# Because this is an async task, the action handler is async. However, # Because this is an async task, the action handler is async. However,
# we need the 'normal' action handler for the status check, so get it # we need the 'normal' action handler for the status check, so get it
# now via the action_loader # now via the action_loader

@ -60,28 +60,28 @@ class TaskQueueManager:
which dispatches the Play's tasks to hosts. which dispatches the Play's tasks to hosts.
''' '''
RUN_OK = 0 RUN_OK = 0
RUN_ERROR = 1 RUN_ERROR = 1
RUN_FAILED_HOSTS = 2 RUN_FAILED_HOSTS = 2
RUN_UNREACHABLE_HOSTS = 4 RUN_UNREACHABLE_HOSTS = 4
RUN_FAILED_BREAK_PLAY = 8 RUN_FAILED_BREAK_PLAY = 8
RUN_UNKNOWN_ERROR = 255 RUN_UNKNOWN_ERROR = 255
def __init__(self, inventory, variable_manager, loader, options, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False): def __init__(self, inventory, variable_manager, loader, options, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False):
self._inventory = inventory self._inventory = inventory
self._variable_manager = variable_manager self._variable_manager = variable_manager
self._loader = loader self._loader = loader
self._options = options self._options = options
self._stats = AggregateStats() self._stats = AggregateStats()
self.passwords = passwords self.passwords = passwords
self._stdout_callback = stdout_callback self._stdout_callback = stdout_callback
self._run_additional_callbacks = run_additional_callbacks self._run_additional_callbacks = run_additional_callbacks
self._run_tree = run_tree self._run_tree = run_tree
self._callbacks_loaded = False self._callbacks_loaded = False
self._callback_plugins = [] self._callback_plugins = []
self._start_at_done = False self._start_at_done = False
# make sure the module path (if specified) is parsed and # make sure the module path (if specified) is parsed and
# added to the module_loader object # added to the module_loader object
@ -97,7 +97,7 @@ class TaskQueueManager:
self._listening_handlers = dict() self._listening_handlers = dict()
# dictionaries to keep track of failed/unreachable hosts # dictionaries to keep track of failed/unreachable hosts
self._failed_hosts = dict() self._failed_hosts = dict()
self._unreachable_hosts = dict() self._unreachable_hosts = dict()
self._final_q = multiprocessing.Queue() self._final_q = multiprocessing.Queue()
@ -145,7 +145,7 @@ class TaskQueueManager:
if handler.listen: if handler.listen:
listeners = handler.listen listeners = handler.listen
if not isinstance(listeners, list): if not isinstance(listeners, list):
listeners = [ listeners ] listeners = [listeners]
for listener in listeners: for listener in listeners:
if listener not in self._listening_handlers: if listener not in self._listening_handlers:
self._listening_handlers[listener] = [] self._listening_handlers[listener] = []
@ -182,7 +182,7 @@ class TaskQueueManager:
# the name of the current plugin and type to see if we need to skip # the name of the current plugin and type to see if we need to skip
# loading this callback plugin # loading this callback plugin
callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', None) callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', None)
callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False) callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False)
(callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
if callback_type == 'stdout': if callback_type == 'stdout':
if callback_name != self._stdout_callback or stdout_callback_loaded: if callback_name != self._stdout_callback or stdout_callback_loaded:
@ -262,7 +262,7 @@ class TaskQueueManager:
play_context=play_context, play_context=play_context,
variable_manager=self._variable_manager, variable_manager=self._variable_manager,
all_vars=all_vars, all_vars=all_vars,
start_at_done = self._start_at_done, start_at_done=self._start_at_done,
) )
# Because the TQM may survive multiple play runs, we start by marking # Because the TQM may survive multiple play runs, we start by marking
@ -332,7 +332,7 @@ class TaskQueueManager:
# <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])> # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])>
defunct = False defunct = False
for idx,x in enumerate(self._workers): for (idx, x) in enumerate(self._workers):
if hasattr(x[0], 'exitcode'): if hasattr(x[0], 'exitcode'):
if x[0].exitcode in [-9, -11, -15]: if x[0].exitcode in [-9, -11, -15]:
defunct = True defunct = True
@ -350,7 +350,7 @@ class TaskQueueManager:
for possible in [method_name, 'v2_on_any']: for possible in [method_name, 'v2_on_any']:
gotit = getattr(callback_plugin, possible, None) gotit = getattr(callback_plugin, possible, None)
if gotit is None: if gotit is None:
gotit = getattr(callback_plugin, possible.replace('v2_',''), None) gotit = getattr(callback_plugin, possible.replace('v2_', ''), None)
if gotit is not None: if gotit is not None:
methods.append(gotit) methods.append(gotit)

@ -10,20 +10,10 @@ lib/ansible/cli/playbook.py
lib/ansible/cli/pull.py lib/ansible/cli/pull.py
lib/ansible/cli/vault.py lib/ansible/cli/vault.py
lib/ansible/constants.py lib/ansible/constants.py
lib/ansible/executor/__init__.py
lib/ansible/executor/action_write_locks.py
lib/ansible/executor/module_common.py
lib/ansible/executor/play_iterator.py
lib/ansible/executor/playbook_executor.py
lib/ansible/executor/process/__init__.py
lib/ansible/executor/process/worker.py
lib/ansible/executor/stats.py
lib/ansible/executor/task_executor.py
lib/ansible/executor/task_queue_manager.py
lib/ansible/inventory/manager.py
lib/ansible/inventory/data.py lib/ansible/inventory/data.py
lib/ansible/inventory/group.py lib/ansible/inventory/group.py
lib/ansible/inventory/host.py lib/ansible/inventory/host.py
lib/ansible/inventory/manager.py
lib/ansible/module_utils/_text.py lib/ansible/module_utils/_text.py
lib/ansible/module_utils/a10.py lib/ansible/module_utils/a10.py
lib/ansible/module_utils/ansible_tower.py lib/ansible/module_utils/ansible_tower.py

Loading…
Cancel
Save