From d2b3b2c03e2934b126f5701e5f6e25821e2dbe35 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Sun, 31 Jul 2016 03:23:28 -0500 Subject: [PATCH] Performance improvements --- lib/ansible/executor/play_iterator.py | 8 +- lib/ansible/executor/process/result.py | 196 -------- lib/ansible/executor/process/worker.py | 18 +- lib/ansible/executor/task_executor.py | 6 +- lib/ansible/executor/task_queue_manager.py | 9 +- lib/ansible/executor/task_result.py | 4 +- lib/ansible/inventory/host.py | 10 +- lib/ansible/playbook/base.py | 7 +- lib/ansible/playbook/block.py | 10 +- lib/ansible/playbook/included_file.py | 14 +- lib/ansible/playbook/task.py | 7 +- lib/ansible/plugins/strategy/__init__.py | 428 +++++++++--------- lib/ansible/plugins/strategy/free.py | 1 + lib/ansible/plugins/strategy/linear.py | 1 + test/units/executor/test_task_executor.py | 2 +- .../plugins/strategies/test_strategy_base.py | 93 ++-- 16 files changed, 332 insertions(+), 482 deletions(-) delete mode 100644 lib/ansible/executor/process/result.py diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py index 51d066ead3c..03fdc0b0066 100644 --- a/lib/ansible/executor/play_iterator.py +++ b/lib/ansible/executor/play_iterator.py @@ -508,6 +508,12 @@ class PlayIterator: the different processes, and not all data structures are preserved. This method allows us to find the original task passed into the executor engine. ''' + + if isinstance(task, Task): + the_uuid = task._uuid + else: + the_uuid = task + def _search_block(block): ''' helper method to check a block's task lists (block/rescue/always) @@ -521,7 +527,7 @@ class PlayIterator: res = _search_block(t) if res: return res - elif t._uuid == task._uuid: + elif t._uuid == the_uuid: return t return None diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py deleted file mode 100644 index da06710293c..00000000000 --- a/lib/ansible/executor/process/result.py +++ /dev/null @@ -1,196 +0,0 @@ -# (c) 2012-2014, Michael DeHaan -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from ansible.compat.six.moves import queue -from ansible.compat.six import iteritems, text_type -from ansible.vars import strip_internal_keys - -import multiprocessing -import time -import traceback - -# TODO: not needed if we use the cryptography library with its default RNG -# engine -HAS_ATFORK=True -try: - from Crypto.Random import atfork -except ImportError: - HAS_ATFORK=False - -try: - from __main__ import display -except ImportError: - from ansible.utils.display import Display - display = Display() - - -__all__ = ['ResultProcess'] - - -class ResultProcess(multiprocessing.Process): - ''' - The result worker thread, which reads results from the results - queue and fires off callbacks/etc. as necessary. - ''' - - def __init__(self, final_q, workers): - - # takes a task queue manager as the sole param: - self._final_q = final_q - self._workers = workers - self._cur_worker = 0 - self._terminated = False - - super(ResultProcess, self).__init__() - - def _send_result(self, result): - display.debug(u"sending result: %s" % ([text_type(x) for x in result],)) - self._final_q.put(result) - display.debug("done sending result") - - def _read_worker_result(self): - result = None - starting_point = self._cur_worker - while True: - (worker_prc, rslt_q) = self._workers[self._cur_worker] - self._cur_worker += 1 - if self._cur_worker >= len(self._workers): - self._cur_worker = 0 - - try: - if not rslt_q.empty(): - display.debug("worker %d has data to read" % self._cur_worker) - result = rslt_q.get() - display.debug("got a result from worker %d: %s" % (self._cur_worker, result)) - break - except queue.Empty: - pass - - if self._cur_worker == starting_point: - break - - return result - - def terminate(self): - self._terminated = True - super(ResultProcess, self).terminate() - - def run(self): - ''' - The main thread execution, which reads from the results queue - indefinitely and sends callbacks/etc. when results are received. - ''' - - if HAS_ATFORK: - atfork() - - while True: - try: - result = self._read_worker_result() - if result is None: - time.sleep(0.005) - continue - - # send callbacks for 'non final' results - if '_ansible_retry' in result._result: - self._send_result(('v2_runner_retry', result)) - continue - elif '_ansible_item_result' in result._result: - if result.is_failed() or result.is_unreachable(): - self._send_result(('v2_runner_item_on_failed', result)) - elif result.is_skipped(): - self._send_result(('v2_runner_item_on_skipped', result)) - else: - self._send_result(('v2_runner_item_on_ok', result)) - if 'diff' in result._result: - self._send_result(('v2_on_file_diff', result)) - continue - - clean_copy = strip_internal_keys(result._result) - if 'invocation' in clean_copy: - del clean_copy['invocation'] - - # if this task is registering a result, do it now - if result._task.register: - self._send_result(('register_host_var', result._host, result._task, clean_copy)) - - # send callbacks, execute other options based on the result status - # TODO: this should all be cleaned up and probably moved to a sub-function. - # the fact that this sometimes sends a TaskResult and other times - # sends a raw dictionary back may be confusing, but the result vs. - # results implementation for tasks with loops should be cleaned up - # better than this - if result.is_unreachable(): - self._send_result(('host_unreachable', result)) - elif result.is_failed(): - self._send_result(('host_task_failed', result)) - elif result.is_skipped(): - self._send_result(('host_task_skipped', result)) - else: - if result._task.loop: - # this task had a loop, and has more than one result, so - # loop over all of them instead of a single result - result_items = result._result.get('results', []) - else: - result_items = [ result._result ] - - for result_item in result_items: - # if this task is notifying a handler, do it now - if '_ansible_notify' in result_item: - if result.is_changed(): - # The shared dictionary for notified handlers is a proxy, which - # does not detect when sub-objects within the proxy are modified. - # So, per the docs, we reassign the list so the proxy picks up and - # notifies all other threads - for notify in result_item['_ansible_notify']: - self._send_result(('notify_handler', result, notify)) - - if 'add_host' in result_item: - # this task added a new host (add_host module) - self._send_result(('add_host', result_item)) - elif 'add_group' in result_item: - # this task added a new group (group_by module) - self._send_result(('add_group', result._host, result_item)) - elif 'ansible_facts' in result_item: - # if this task is registering facts, do that now - loop_var = 'item' - if result._task.loop_control: - loop_var = result._task.loop_control.loop_var or 'item' - item = result_item.get(loop_var, None) - if result._task.action == 'include_vars': - for (key, value) in iteritems(result_item['ansible_facts']): - self._send_result(('set_host_var', result._host, result._task, item, key, value)) - else: - self._send_result(('set_host_facts', result._host, result._task, item, result_item['ansible_facts'])) - - # finally, send the ok for this task - self._send_result(('host_task_ok', result)) - - except queue.Empty: - pass - except (KeyboardInterrupt, SystemExit, IOError, EOFError): - break - except: - # TODO: we should probably send a proper callback here instead of - # simply dumping a stack trace on the screen - traceback.print_exc() - break - diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index d1bc56f637f..aeab3e75a7a 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -100,6 +100,10 @@ class WorkerProcess(multiprocessing.Process): signify that they are ready for their next task. ''' + #import cProfile, pstats, StringIO + #pr = cProfile.Profile() + #pr.enable() + if HAS_ATFORK: atfork() @@ -120,7 +124,7 @@ class WorkerProcess(multiprocessing.Process): display.debug("done running TaskExecutor() for %s/%s" % (self._host, self._task)) self._host.vars = dict() self._host.groups = [] - task_result = TaskResult(self._host, self._task, executor_result) + task_result = TaskResult(self._host.name, self._task._uuid, executor_result) # put the result on the result queue display.debug("sending task result") @@ -130,7 +134,7 @@ class WorkerProcess(multiprocessing.Process): except AnsibleConnectionFailure: self._host.vars = dict() self._host.groups = [] - task_result = TaskResult(self._host, self._task, dict(unreachable=True)) + task_result = TaskResult(self._host.name, self._task._uuid, dict(unreachable=True)) self._rslt_q.put(task_result, block=False) except Exception as e: @@ -138,7 +142,7 @@ class WorkerProcess(multiprocessing.Process): try: self._host.vars = dict() self._host.groups = [] - task_result = TaskResult(self._host, self._task, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout='')) + task_result = TaskResult(self._host.name, self._task._uuid, dict(failed=True, exception=to_unicode(traceback.format_exc()), stdout='')) self._rslt_q.put(task_result, block=False) except: display.debug(u"WORKER EXCEPTION: %s" % to_unicode(e)) @@ -146,3 +150,11 @@ class WorkerProcess(multiprocessing.Process): display.debug("WORKER PROCESS EXITING") + #pr.disable() + #s = StringIO.StringIO() + #sortby = 'time' + #ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + #ps.print_stats() + #with open('worker_%06d.stats' % os.getpid(), 'w') as f: + # f.write(s.getvalue()) + diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index e34c4784e10..88f69070ebb 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -246,7 +246,7 @@ class TaskExecutor: task_vars[loop_var] = item try: - tmp_task = self._task.copy() + tmp_task = self._task.copy(exclude_tasks=True) tmp_play_context = self._play_context.copy() except AnsibleParserError as e: results.append(dict(failed=True, msg=to_unicode(e))) @@ -265,7 +265,7 @@ class TaskExecutor: res[loop_var] = item res['_ansible_item_result'] = True - self._rslt_q.put(TaskResult(self._host, self._task, res), block=False) + self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, res), block=False) results.append(res) del task_vars[loop_var] @@ -516,7 +516,7 @@ class TaskExecutor: result['_ansible_retry'] = True result['retries'] = retries display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) - self._rslt_q.put(TaskResult(self._host, self._task, result), block=False) + self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result), block=False) time.sleep(delay) else: if retries > 1: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index b59ca0ab882..7212b06aea2 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -26,7 +26,6 @@ import tempfile from ansible import constants as C from ansible.errors import AnsibleError from ansible.executor.play_iterator import PlayIterator -from ansible.executor.process.result import ResultProcess from ansible.executor.stats import AggregateStats from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext @@ -81,7 +80,6 @@ class TaskQueueManager: self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False - self._result_prc = None # make sure the module path (if specified) is parsed and # added to the module_loader object @@ -113,9 +111,6 @@ class TaskQueueManager: rslt_q = multiprocessing.Queue() self._workers.append([None, rslt_q]) - self._result_prc = ResultProcess(self._final_q, self._workers) - self._result_prc.start() - def _initialize_notified_handlers(self, play): ''' Clears and initializes the shared notified handlers dict with entries @@ -299,9 +294,7 @@ class TaskQueueManager: self._cleanup_processes() def _cleanup_processes(self): - if self._result_prc: - self._result_prc.terminate() - + if hasattr(self, '_workers'): for (worker_prc, rslt_q) in self._workers: rslt_q.close() if worker_prc and worker_prc.is_alive(): diff --git a/lib/ansible/executor/task_result.py b/lib/ansible/executor/task_result.py index f7ba7d092c4..0f1967fe5c8 100644 --- a/lib/ansible/executor/task_result.py +++ b/lib/ansible/executor/task_result.py @@ -41,7 +41,7 @@ class TaskResult: def is_skipped(self): # loop results - if 'results' in self._result and self._task.loop: + if 'results' in self._result: results = self._result['results'] # Loop tasks are only considered skipped if all items were skipped. # some squashed results (eg, yum) are not dicts and can't be skipped individually @@ -62,7 +62,7 @@ class TaskResult: return self._check_key('unreachable') def _check_key(self, key): - if self._result.get('results', []) and self._task.loop: + if self._result.get('results', []): flag = False for res in self._result.get('results', []): if isinstance(res, dict): diff --git a/lib/ansible/inventory/host.py b/lib/ansible/inventory/host.py index 36cc90f5377..268eef2f46d 100644 --- a/lib/ansible/inventory/host.py +++ b/lib/ansible/inventory/host.py @@ -64,12 +64,12 @@ class Host: ) def deserialize(self, data): - self.__init__() + self.__init__(gen_uuid=False) self.name = data.get('name') self.vars = data.get('vars', dict()) self.address = data.get('address', '') - self._uuid = data.get('uuid', uuid.uuid4()) + self._uuid = data.get('uuid', None) self.implicit= data.get('implicit', False) groups = data.get('groups', []) @@ -78,7 +78,7 @@ class Host: g.deserialize(group_data) self.groups.append(g) - def __init__(self, name=None, port=None): + def __init__(self, name=None, port=None, gen_uuid=True): self.name = name self.vars = {} @@ -90,7 +90,9 @@ class Host: self.set_variable('ansible_port', int(port)) self._gathered_facts = False - self._uuid = uuid.uuid4() + self._uuid = None + if gen_uuid: + self._uuid = uuid.uuid4() self.implicit = False def __repr__(self): diff --git a/lib/ansible/playbook/base.py b/lib/ansible/playbook/base.py index 6bafe42844f..b34bf448b00 100644 --- a/lib/ansible/playbook/base.py +++ b/lib/ansible/playbook/base.py @@ -80,6 +80,7 @@ class Base: # every object gets a random uuid: self._uuid = uuid.uuid4() + #self._uuid = 1 # and initialize the base attributes self._initialize_base_attributes() @@ -137,8 +138,8 @@ class Base: ''' # check cache before retrieving attributes - if self.__class__ in BASE_ATTRIBUTES: - return BASE_ATTRIBUTES[self.__class__] + if self.__class__.__name__ in BASE_ATTRIBUTES: + return BASE_ATTRIBUTES[self.__class__.__name__] # Cache init base_attributes = dict() @@ -147,7 +148,7 @@ class Base: if name.startswith('_'): name = name[1:] base_attributes[name] = value - BASE_ATTRIBUTES[self.__class__] = base_attributes + BASE_ATTRIBUTES[self.__class__.__name__] = base_attributes return base_attributes def _initialize_base_attributes(self): diff --git a/lib/ansible/playbook/block.py b/lib/ansible/playbook/block.py index 2994eadf569..016e53298b7 100644 --- a/lib/ansible/playbook/block.py +++ b/lib/ansible/playbook/block.py @@ -185,7 +185,7 @@ class Block(Base, Become, Conditional, Taggable): new_me._parent_block = None if self._parent_block and not exclude_parent: - new_me._parent_block = self._parent_block.copy(exclude_tasks=exclude_tasks) + new_me._parent_block = self._parent_block#.copy(exclude_tasks=exclude_tasks) new_me._role = None if self._role: @@ -193,8 +193,8 @@ class Block(Base, Become, Conditional, Taggable): new_me._task_include = None if self._task_include: - new_me._task_include = self._task_include.copy(exclude_block=True) - new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True) + new_me._task_include = self._task_include#.copy(exclude_block=True) + #new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True) return new_me @@ -213,8 +213,8 @@ class Block(Base, Become, Conditional, Taggable): if self._role is not None: data['role'] = self._role.serialize() - if self._task_include is not None: - data['task_include'] = self._task_include.serialize() + #if self._task_include is not None: + # data['task_include'] = self._task_include.serialize() if self._parent_block is not None: data['parent_block'] = self._parent_block.copy(exclude_tasks=True).serialize() diff --git a/lib/ansible/playbook/included_file.py b/lib/ansible/playbook/included_file.py index 23a1f7860a8..b6992d27388 100644 --- a/lib/ansible/playbook/included_file.py +++ b/lib/ansible/playbook/included_file.py @@ -60,8 +60,11 @@ class IncludedFile: for res in results: - if res._task.action == 'include': - if res._task.loop: + original_host = res._host + original_task = res._task + + if original_task.action == 'include': + if original_task.loop: if 'results' not in res._result: continue include_results = res._result['results'] @@ -73,16 +76,13 @@ class IncludedFile: if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result: continue - original_host = get_original_host(res._host) - original_task = iterator.get_original_task(original_host, res._task) - task_vars = variable_manager.get_vars(loader=loader, play=iterator._play, host=original_host, task=original_task) templar = Templar(loader=loader, variables=task_vars) include_variables = include_result.get('include_variables', dict()) loop_var = 'item' - if res._task.loop_control: - loop_var = res._task.loop_control.loop_var or 'item' + if original_task.loop_control: + loop_var = original_task.loop_control.loop_var or 'item' if loop_var in include_result: task_vars[loop_var] = include_variables[loop_var] = include_result[loop_var] diff --git a/lib/ansible/playbook/task.py b/lib/ansible/playbook/task.py index 53e8d5ee056..e8b1b60f8c7 100644 --- a/lib/ansible/playbook/task.py +++ b/lib/ansible/playbook/task.py @@ -326,12 +326,12 @@ class Task(Base, Conditional, Taggable, Become): all_vars.update(self.vars) return all_vars - def copy(self, exclude_block=False): + def copy(self, exclude_block=False, exclude_tasks=False): new_me = super(Task, self).copy() new_me._block = None if self._block and not exclude_block: - new_me._block = self._block.copy() + new_me._block = self._block#.copy(exclude_tasks=exclude_tasks) new_me._role = None if self._role: @@ -339,7 +339,8 @@ class Task(Base, Conditional, Taggable, Become): new_me._task_include = None if self._task_include: - new_me._task_include = self._task_include.copy(exclude_block=exclude_block) + new_me._task_include = self._task_include#.copy(exclude_block=True) + #new_me._task_include._block = self._task_include._block.copy(exclude_tasks=True) return new_me diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 8954d520e11..c4dab7d241a 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -43,7 +43,8 @@ from ansible.plugins import action_loader, connection_loader, filter_loader, loo from ansible.template import Templar from ansible.utils.unicode import to_unicode from ansible.vars.unsafe_proxy import wrap_var -from ansible.vars import combine_vars +from ansible.vars import combine_vars, strip_internal_keys + try: from __main__ import display @@ -174,6 +175,7 @@ class StrategyBase: # The next common higher level is __init__.py::run() and that has # tasks inside of play_iterator so we'd have to extract them to do it # there. + global action_write_locks if task.action not in action_write_locks: display.debug('Creating lock for %s' % task.action) @@ -189,21 +191,22 @@ class StrategyBase: shared_loader_obj = SharedPluginLoaderObj() queued = False + starting_worker = self._cur_worker while True: (worker_prc, rslt_q) = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): - worker_prc = WorkerProcess(rslt_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) self._workers[self._cur_worker][0] = worker_prc worker_prc.start() queued = True self._cur_worker += 1 if self._cur_worker >= len(self._workers): self._cur_worker = 0 - time.sleep(0.005) if queued: break + elif self._cur_worker == starting_worker: + time.sleep(0.0001) - del task_vars self._pending_results += 1 except (EOFError, IOError, AssertionError) as e: # most likely an abort @@ -219,222 +222,233 @@ class StrategyBase: ret_results = [] + def get_original_host(host_name): + host_name = to_unicode(host_name) + if host_name in self._inventory._hosts_cache: + return self._inventory._hosts_cache[host_name] + else: + return self._inventory.get_host(host_name) + + def search_handler_blocks(handler_name, handler_blocks): + for handler_block in handler_blocks: + for handler_task in handler_block.block: + handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task) + templar = Templar(loader=self._loader, variables=handler_vars) + try: + # first we check with the full result of get_name(), which may + # include the role name (if the handler is from a role). If that + # is not found, we resort to the simple name field, which doesn't + # have anything extra added to it. + target_handler_name = templar.template(handler_task.name) + if target_handler_name == handler_name: + return handler_task + else: + target_handler_name = templar.template(handler_task.get_name()) + if target_handler_name == handler_name: + return handler_task + except (UndefinedError, AnsibleUndefinedVariable) as e: + # We skip this handler due to the fact that it may be using + # a variable in the name that was conditionally included via + # set_fact or some other method, and we don't want to error + # out unnecessarily + continue + return None + while not self._final_q.empty() and not self._tqm._terminated: try: - result = self._final_q.get() - display.debug("got result from result worker: %s" % ([text_type(x) for x in result],)) - - # helper method, used to find the original host from the one - # returned in the result/message, which has been serialized and - # thus had some information stripped from it to speed up the - # serialization process - def get_original_host(host): - if host.name in self._inventory._hosts_cache: - return self._inventory._hosts_cache[host.name] + task_result = self._final_q.get(block=False) + original_host = get_original_host(task_result._host) + original_task = iterator.get_original_task(original_host, task_result._task) + task_result._host = original_host + task_result._task = original_task + + # send callbacks for 'non final' results + if '_ansible_retry' in task_result._result: + self._tqm.send_callback('v2_runner_retry', task_result) + continue + elif '_ansible_item_result' in task_result._result: + if task_result.is_failed() or task_result.is_unreachable(): + self._tqm.send_callback('v2_runner_item_on_failed', task_result) + elif task_result.is_skipped(): + self._tqm.send_callback('v2_runner_item_on_skipped', task_result) else: - return self._inventory.get_host(host.name) + self._tqm.send_callback('v2_runner_item_on_ok', task_result) + continue - # all host status messages contain 2 entries: (msg, task_result) - if result[0] in ('host_task_ok', 'host_task_failed', 'host_task_skipped', 'host_unreachable'): - task_result = result[1] - host = get_original_host(task_result._host) - task = task_result._task - if result[0] == 'host_task_failed' or task_result.is_failed(): - if not task.ignore_errors: - display.debug("marking %s as failed" % host.name) - if task.run_once: - # if we're using run_once, we have to fail every host here - [iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts] - else: - iterator.mark_host_failed(host) + if original_task.register: + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] - # only add the host to the failed list officially if it has - # been failed by the iterator - if iterator.is_failed(host): - self._tqm._failed_hosts[host.name] = True - self._tqm._stats.increment('failures', host.name) - else: - # otherwise, we grab the current state and if we're iterating on - # the rescue portion of a block then we save the failed task in a - # special var for use within the rescue/always - state, _ = iterator.get_next_task_for_host(host, peek=True) - if state.run_state == iterator.ITERATING_RESCUE: - original_task = iterator.get_original_task(host, task) - self._variable_manager.set_nonpersistent_facts( - host, - dict( - ansible_failed_task=original_task.serialize(), - ansible_failed_result=task_result._result, - ), - ) - else: - self._tqm._stats.increment('ok', host.name) - self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=task.ignore_errors) - elif result[0] == 'host_unreachable': - self._tqm._unreachable_hosts[host.name] = True - self._tqm._stats.increment('dark', host.name) - self._tqm.send_callback('v2_runner_on_unreachable', task_result) - elif result[0] == 'host_task_skipped': - self._tqm._stats.increment('skipped', host.name) - self._tqm.send_callback('v2_runner_on_skipped', task_result) - elif result[0] == 'host_task_ok': - if task.action != 'include': - self._tqm._stats.increment('ok', host.name) - if 'changed' in task_result._result and task_result._result['changed']: - self._tqm._stats.increment('changed', host.name) - self._tqm.send_callback('v2_runner_on_ok', task_result) + clean_copy = strip_internal_keys(task_result._result) + if 'invocation' in clean_copy: + del clean_copy['invocation'] - if self._diff: - self._tqm.send_callback('v2_on_file_diff', task_result) + for target_host in host_list: + self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) - self._pending_results -= 1 - if host.name in self._blocked_hosts: - del self._blocked_hosts[host.name] - - # If this is a role task, mark the parent role as being run (if - # the task was ok or failed, but not skipped or unreachable) - if task_result._task._role is not None and result[0] in ('host_task_ok', 'host_task_failed'): - # lookup the role in the ROLE_CACHE to make sure we're dealing - # with the correct object and mark it as executed - for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[task_result._task._role._role_name]): - if role_obj._uuid == task_result._task._role._uuid: - role_obj._had_task_run[host.name] = True - - ret_results.append(task_result) - - elif result[0] == 'add_host': - result_item = result[1] - new_host_info = result_item.get('add_host', dict()) - - self._add_host(new_host_info, iterator) - - elif result[0] == 'add_group': - host = get_original_host(result[1]) - result_item = result[2] - self._add_group(host, result_item) - - elif result[0] == 'notify_handler': - task_result = result[1] - handler_name = result[2] - - original_host = get_original_host(task_result._host) - original_task = iterator.get_original_task(original_host, task_result._task) - - def search_handler_blocks(handler_name, handler_blocks): - for handler_block in handler_blocks: - for handler_task in handler_block.block: - handler_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, task=handler_task) - templar = Templar(loader=self._loader, variables=handler_vars) - try: - # first we check with the full result of get_name(), which may - # include the role name (if the handler is from a role). If that - # is not found, we resort to the simple name field, which doesn't - # have anything extra added to it. - target_handler_name = templar.template(handler_task.name) - if target_handler_name == handler_name: - return handler_task - else: - target_handler_name = templar.template(handler_task.get_name()) - if target_handler_name == handler_name: - return handler_task - except (UndefinedError, AnsibleUndefinedVariable): - # We skip this handler due to the fact that it may be using - # a variable in the name that was conditionally included via - # set_fact or some other method, and we don't want to error - # out unnecessarily - continue - return None - - # Find the handler using the above helper. First we look up the - # dependency chain of the current task (if it's from a role), otherwise - # we just look through the list of handlers in the current play/all - # roles and use the first one that matches the notify name - if handler_name in self._listening_handlers: - for listening_handler_name in self._listening_handlers[handler_name]: - listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) - if listening_handler is None: - raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) - - if original_host not in self._notified_handlers[listening_handler]: - self._notified_handlers[listening_handler].append(original_host) - display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) - else: - target_handler = search_handler_blocks(handler_name, iterator._play.handlers) - if target_handler is None: - raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name) - - if target_handler in self._notified_handlers: - if original_host not in self._notified_handlers[target_handler]: - self._notified_handlers[target_handler].append(original_host) - # FIXME: should this be a callback? - display.vv("NOTIFIED HANDLER %s" % (handler_name,)) + # all host status messages contain 2 entries: (msg, task_result) + role_ran = False + if task_result.is_failed(): + role_ran = True + if not original_task.ignore_errors: + display.debug("marking %s as failed" % original_host.name) + if original_task.run_once: + # if we're using run_once, we have to fail every host here + [iterator.mark_host_failed(h) for h in self._inventory.get_hosts(iterator._play.hosts) if h.name not in self._tqm._unreachable_hosts] else: - raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) - - elif result[0] == 'register_host_var': - # essentially the same as 'set_host_var' below, however we - # never follow the delegate_to value for registered vars and - # the variable goes in the fact_cache - host = get_original_host(result[1]) - task = result[2] - var_value = wrap_var(result[3]) - var_name = task.register - - if task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] - else: - host_list = [host] + iterator.mark_host_failed(original_host) - for target_host in host_list: - self._variable_manager.set_nonpersistent_facts(target_host, {var_name: var_value}) - - elif result[0] in ('set_host_var', 'set_host_facts'): - host = get_original_host(result[1]) - task = result[2] - item = result[3] - - # find the host we're actually refering too here, which may - # be a host that is not really in inventory at all - if task.delegate_to is not None and task.delegate_facts: - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - self.add_tqm_variables(task_vars, play=iterator._play) - loop_var = 'item' - if task.loop_control: - loop_var = task.loop_control.loop_var or 'item' - if item is not None: - task_vars[loop_var] = item - templar = Templar(loader=self._loader, variables=task_vars) - host_name = templar.template(task.delegate_to) - actual_host = self._inventory.get_host(host_name) - if actual_host is None: - actual_host = Host(name=host_name) + # only add the host to the failed list officially if it has + # been failed by the iterator + if iterator.is_failed(original_host): + self._tqm._failed_hosts[original_host.name] = True + self._tqm._stats.increment('failures', original_host.name) + else: + # otherwise, we grab the current state and if we're iterating on + # the rescue portion of a block then we save the failed task in a + # special var for use within the rescue/always + state, _ = iterator.get_next_task_for_host(original_host, peek=True) + if state.run_state == iterator.ITERATING_RESCUE: + self._variable_manager.set_nonpersistent_facts( + original_host, + dict( + ansible_failed_task=original_task.serialize(), + ansible_failed_result=task_result._result, + ), + ) else: - actual_host = host + self._tqm._stats.increment('ok', original_host.name) + self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=original_task.ignore_errors) + elif task_result.is_unreachable(): + self._tqm._unreachable_hosts[original_host.name] = True + self._tqm._stats.increment('dark', original_host.name) + self._tqm.send_callback('v2_runner_on_unreachable', task_result) + elif task_result.is_skipped(): + self._tqm._stats.increment('skipped', original_host.name) + self._tqm.send_callback('v2_runner_on_skipped', task_result) + else: + role_ran = True - if task.run_once: - host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + if original_task.loop: + # this task had a loop, and has more than one result, so + # loop over all of them instead of a single result + result_items = task_result._result.get('results', []) else: - host_list = [actual_host] - - if result[0] == 'set_host_var': - var_name = result[4] - var_value = result[5] - for target_host in host_list: - self._variable_manager.set_host_variable(target_host, var_name, var_value) - elif result[0] == 'set_host_facts': - facts = result[4] - for target_host in host_list: - if task.action == 'set_fact': - self._variable_manager.set_nonpersistent_facts(target_host, facts.copy()) + result_items = [ task_result._result ] + + for result_item in result_items: + if '_ansible_notify' in result_item: + if task_result.is_changed(): + # The shared dictionary for notified handlers is a proxy, which + # does not detect when sub-objects within the proxy are modified. + # So, per the docs, we reassign the list so the proxy picks up and + # notifies all other threads + for handler_name in result_item['_ansible_notify']: + # Find the handler using the above helper. First we look up the + # dependency chain of the current task (if it's from a role), otherwise + # we just look through the list of handlers in the current play/all + # roles and use the first one that matches the notify name + if handler_name in self._listening_handlers: + for listening_handler_name in self._listening_handlers[handler_name]: + listening_handler = search_handler_blocks(listening_handler_name, iterator._play.handlers) + if listening_handler is None: + raise AnsibleError("The requested handler listener '%s' was not found in any of the known handlers" % listening_handler_name) + if original_host not in self._notified_handlers[listening_handler]: + self._notified_handlers[listening_handler].append(original_host) + display.vv("NOTIFIED HANDLER %s" % (listening_handler_name,)) + + else: + target_handler = search_handler_blocks(handler_name, iterator._play.handlers) + if target_handler is None: + raise AnsibleError("The requested handler '%s' was not found in any of the known handlers" % handler_name) + if target_handler in self._notified_handlers: + if original_host not in self._notified_handlers[target_handler]: + self._notified_handlers[target_handler].append(original_host) + # FIXME: should this be a callback? + display.vv("NOTIFIED HANDLER %s" % (handler_name,)) + else: + raise AnsibleError("The requested handler '%s' was found in neither the main handlers list nor the listening handlers list" % handler_name) + + if 'add_host' in result_item: + # this task added a new host (add_host module) + new_host_info = result_item.get('add_host', dict()) + self._add_host(new_host_info, iterator) + + elif 'add_group' in result_item: + # this task added a new group (group_by module) + self._add_group(original_host, result_item) + + elif 'ansible_facts' in result_item: + loop_var = 'item' + if original_task.loop_control: + loop_var = original_task.loop_control.loop_var or 'item' + + item = result_item.get(loop_var, None) + + if original_task.action == 'include_vars': + for (var_name, var_value) in iteritems(result_item['ansible_facts']): + # find the host we're actually refering too here, which may + # be a host that is not really in inventory at all + if original_task.delegate_to is not None and original_task.delegate_facts: + task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) + self.add_tqm_variables(task_vars, play=iterator._play) + if item is not None: + task_vars[loop_var] = item + templar = Templar(loader=self._loader, variables=task_vars) + host_name = templar.template(original_task.delegate_to) + actual_host = self._inventory.get_host(host_name) + if actual_host is None: + actual_host = Host(name=host_name) + else: + actual_host = original_host + + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [actual_host] + + for target_host in host_list: + self._variable_manager.set_host_variable(target_host, var_name, var_value) else: - self._variable_manager.set_host_facts(target_host, facts.copy()) - elif result[0].startswith('v2_runner_item') or result[0] == 'v2_runner_retry': - self._tqm.send_callback(result[0], result[1]) - elif result[0] == 'v2_on_file_diff': - if self._diff: - self._tqm.send_callback('v2_on_file_diff', result[1]) - else: - raise AnsibleError("unknown result message received: %s" % result[0]) + if original_task.run_once: + host_list = [host for host in self._inventory.get_hosts(iterator._play.hosts) if host.name not in self._tqm._unreachable_hosts] + else: + host_list = [original_host] + + for target_host in host_list: + if original_task.action == 'set_fact': + self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) + else: + self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) + + if 'diff' in task_result._result: + if self._diff: + self._tqm.send_callback('v2_on_file_diff', task_result) + + if original_task.action != 'include': + self._tqm._stats.increment('ok', original_host.name) + if 'changed' in task_result._result and task_result._result['changed']: + self._tqm._stats.increment('changed', original_host.name) + + # finally, send the ok for this task + self._tqm.send_callback('v2_runner_on_ok', task_result) + + self._pending_results -= 1 + if original_host.name in self._blocked_hosts: + del self._blocked_hosts[original_host.name] + + # If this is a role task, mark the parent role as being run (if + # the task was ok or failed, but not skipped or unreachable) + if original_task._role is not None and role_ran: + # lookup the role in the ROLE_CACHE to make sure we're dealing + # with the correct object and mark it as executed + for (entry, role_obj) in iteritems(iterator._play.ROLE_CACHE[original_task._role._role_name]): + if role_obj._uuid == original_task._role._uuid: + role_obj._had_task_run[original_host.name] = True + + ret_results.append(task_result) except Queue.Empty: time.sleep(0.005) diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py index 89287c49ab3..772db5cf5a7 100644 --- a/lib/ansible/plugins/strategy/free.py +++ b/lib/ansible/plugins/strategy/free.py @@ -146,6 +146,7 @@ class StrategyModule(StrategyBase): display.warning("Using any_errors_fatal with the free strategy is not supported, as tasks are executed independently on each host") self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) self._queue_task(host, task, task_vars, play_context) + del task_vars else: display.debug("%s is blocked, skipping for now" % host_name) diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 50028fbe1c4..82532417928 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -253,6 +253,7 @@ class StrategyModule(StrategyBase): self._blocked_hosts[host.get_name()] = True self._queue_task(host, task, task_vars, play_context) + del task_vars # if we're bypassing the host loop, break out now if run_once: diff --git a/test/units/executor/test_task_executor.py b/test/units/executor/test_task_executor.py index 3f9a614dfb0..12d13302391 100644 --- a/test/units/executor/test_task_executor.py +++ b/test/units/executor/test_task_executor.py @@ -139,7 +139,7 @@ class TestTaskExecutor(unittest.TestCase): mock_host = MagicMock() - def _copy(): + def _copy(exclude_block=False, exclude_tasks=False): new_item = MagicMock() return new_item diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 23c3ce40bb2..46c3729fd2e 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -19,6 +19,8 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import uuid + from ansible.compat.tests import unittest from ansible.compat.tests.mock import patch, MagicMock @@ -27,6 +29,7 @@ from ansible.plugins.strategy import StrategyBase from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.task_result import TaskResult +from ansible.playbook.block import Block from ansible.playbook.handler import Handler from ansible.inventory.host import Host @@ -183,11 +186,6 @@ class TestStrategyBase(unittest.TestCase): mock_play = MagicMock() - mock_iterator = MagicMock() - mock_iterator._play = mock_play - mock_iterator.mark_host_failed.return_value = None - mock_iterator.get_next_task_for_host.return_value = (None, None) - mock_host = MagicMock() mock_host.name = 'test01' mock_host.vars = dict() @@ -196,6 +194,8 @@ class TestStrategyBase(unittest.TestCase): mock_task = MagicMock() mock_task._role = None mock_task.ignore_errors = False + mock_task._uuid = uuid.uuid4() + mock_task.loop = None mock_handler_task = MagicMock(Handler) mock_handler_task.name = 'test handler' @@ -203,8 +203,16 @@ class TestStrategyBase(unittest.TestCase): mock_handler_task.get_name.return_value = "test handler" mock_handler_task.has_triggered.return_value = False + mock_iterator = MagicMock() + mock_iterator._play = mock_play + mock_iterator.mark_host_failed.return_value = None + mock_iterator.get_next_task_for_host.return_value = (None, None) + mock_iterator.get_original_task.return_value = mock_task + mock_handler_block = MagicMock() mock_handler_block.block = [mock_handler_task] + mock_handler_block.rescue = [] + mock_handler_block.always = [] mock_play.handlers = [mock_handler_block] mock_tqm._notified_handlers = {mock_handler_task: []} @@ -245,8 +253,8 @@ class TestStrategyBase(unittest.TestCase): results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 0) - task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True)) - queue_items.append(('host_task_ok', task_result)) + task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True)) + queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 results = strategy_base._wait_on_pending_results(iterator=mock_iterator) @@ -255,10 +263,11 @@ class TestStrategyBase(unittest.TestCase): self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) - task_result = TaskResult(host=mock_host, task=mock_task, return_data='{"failed":true}') - queue_items.append(('host_task_failed', task_result)) + task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"failed":true}') + queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + mock_iterator.is_failed.return_value = True results = strategy_base._process_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -266,9 +275,10 @@ class TestStrategyBase(unittest.TestCase): self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertIn('test01', mock_tqm._failed_hosts) del mock_tqm._failed_hosts['test01'] + mock_iterator.is_failed.return_value = False - task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') - queue_items.append(('host_unreachable', task_result)) + task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"unreachable": true}') + queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 results = strategy_base._wait_on_pending_results(iterator=mock_iterator) @@ -279,8 +289,8 @@ class TestStrategyBase(unittest.TestCase): self.assertIn('test01', mock_tqm._unreachable_hosts) del mock_tqm._unreachable_hosts['test01'] - task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') - queue_items.append(('host_task_skipped', task_result)) + task_result = TaskResult(host=mock_host.name, task=mock_task._uuid, return_data='{"skipped": true}') + queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 results = strategy_base._wait_on_pending_results(iterator=mock_iterator) @@ -289,42 +299,44 @@ class TestStrategyBase(unittest.TestCase): self.assertEqual(strategy_base._pending_results, 0) self.assertNotIn('test01', strategy_base._blocked_hosts) + queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - - queue_items.append(('add_host', dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) results = strategy_base._process_pending_results(iterator=mock_iterator) - self.assertEqual(len(results), 0) - self.assertEqual(strategy_base._pending_results, 1) - self.assertIn('test01', strategy_base._blocked_hosts) + self.assertEqual(len(results), 1) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) - queue_items.append(('add_group', mock_host, dict(add_group=dict(group_name='foo')))) + queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 results = strategy_base._process_pending_results(iterator=mock_iterator) - self.assertEqual(len(results), 0) - self.assertEqual(strategy_base._pending_results, 1) - self.assertIn('test01', strategy_base._blocked_hosts) + self.assertEqual(len(results), 1) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) - task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True)) - queue_items.append(('notify_handler', task_result, 'test handler')) + queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 results = strategy_base._process_pending_results(iterator=mock_iterator) - self.assertEqual(len(results), 0) - self.assertEqual(strategy_base._pending_results, 1) - self.assertIn('test01', strategy_base._blocked_hosts) + self.assertEqual(len(results), 1) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) self.assertIn(mock_handler_task, strategy_base._notified_handlers) self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task]) - queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar')) - results = strategy_base._process_pending_results(iterator=mock_iterator) - self.assertEqual(len(results), 0) - self.assertEqual(strategy_base._pending_results, 1) + #queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar')) + #results = strategy_base._process_pending_results(iterator=mock_iterator) + #self.assertEqual(len(results), 0) + #self.assertEqual(strategy_base._pending_results, 1) - queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict())) - results = strategy_base._process_pending_results(iterator=mock_iterator) - self.assertEqual(len(results), 0) - self.assertEqual(strategy_base._pending_results, 1) + #queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict())) + #results = strategy_base._process_pending_results(iterator=mock_iterator) + #self.assertEqual(len(results), 0) + #self.assertEqual(strategy_base._pending_results, 1) - queue_items.append(('bad')) - self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) + #queue_items.append(('bad')) + #self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) def test_strategy_base_load_included_file(self): fake_loader = DictDataLoader({ @@ -377,6 +389,8 @@ class TestStrategyBase(unittest.TestCase): mock_handler_task.action = 'foo' mock_handler_task.get_name.return_value = "test handler" mock_handler_task.has_triggered.return_value = False + mock_handler_task.listen = None + mock_handler_task._role = None mock_handler = MagicMock() mock_handler.block = [mock_handler_task] @@ -395,8 +409,9 @@ class TestStrategyBase(unittest.TestCase): mock_var_mgr = MagicMock() mock_var_mgr.get_vars.return_value = dict() - mock_iterator = MagicMock + mock_iterator = MagicMock() mock_iterator._play = mock_play + mock_iterator.get_original_task.return_value = mock_handler_task fake_loader = DictDataLoader() mock_options = MagicMock() @@ -420,7 +435,7 @@ class TestStrategyBase(unittest.TestCase): strategy_base._notified_handlers = {mock_handler_task: [mock_host]} task_result = TaskResult(Host('host01'), Handler(), dict(changed=False)) - tqm._final_q.put(('host_task_ok', task_result)) + tqm._final_q.put(task_result) result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context) finally: