From 7c726d197b6db40ecb170c59e9e2203b2b9a01aa Mon Sep 17 00:00:00 2001 From: Brian Coca Date: Sat, 6 Feb 2016 00:53:01 -0500 Subject: [PATCH] correctly handle term signals - adhoc now terminates gracefully - avoid race condition on terminations by ignoring errors if worker might have been reaped between checking if active and termination call - ansible-playbook now properly exits on sigint/term - adhoc and playbook now give exceptions that we should not normally capture and rely on top level finally to reap children - handle systemexit breaks in workers - added debug to see at which frame we exit partial fix for #14346 --- lib/ansible/cli/adhoc.py | 9 +++++++++ lib/ansible/executor/playbook_executor.py | 9 +++++---- lib/ansible/executor/process/result.py | 2 +- lib/ansible/executor/process/worker.py | 4 ++-- lib/ansible/executor/task_queue_manager.py | 5 ++++- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/ansible/cli/adhoc.py b/lib/ansible/cli/adhoc.py index 97df8fcdbf0..faacd6c67bb 100644 --- a/lib/ansible/cli/adhoc.py +++ b/lib/ansible/cli/adhoc.py @@ -21,6 +21,7 @@ __metaclass__ = type ######################################################## import os +import signal from ansible import constants as C from ansible.cli import CLI @@ -88,6 +89,10 @@ class AdHocCLI(CLI): tasks = [ dict(action=dict(module=self.options.module_name, args=parse_kv(self.options.module_args)), async=async, poll=poll) ] ) + def _terminate(self, signum=None, framenum=None): + if signum is not None: + raise SystemExit("Interrupt detected, shutting down gracefully") + def run(self): ''' use Runner lib to do SSH things ''' @@ -170,6 +175,9 @@ class AdHocCLI(CLI): # now create a task queue manager to execute the play self._tqm = None try: + # Manage user interruptions + signal.signal(signal.SIGTERM, self._terminate) + self._tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, @@ -180,6 +188,7 @@ class AdHocCLI(CLI): run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS, run_tree=run_tree, ) + result = self._tqm.run(play) finally: if self._tqm: diff --git a/lib/ansible/executor/playbook_executor.py b/lib/ansible/executor/playbook_executor.py index 30d9ad6d6b8..c1a1303a9c7 100644 --- a/lib/ansible/executor/playbook_executor.py +++ b/lib/ansible/executor/playbook_executor.py @@ -69,7 +69,7 @@ class PlaybookExecutor: may limit the runs to serialized groups, etc. ''' - signal.signal(signal.SIGINT, self._cleanup) + signal.signal(signal.SIGTERM, self._terminate) result = 0 entrylist = [] @@ -199,7 +199,7 @@ class PlaybookExecutor: finally: if self._tqm is not None: - self._cleanup() + self._tqm.cleanup() if self._options.syntax: display.display("No issues encountered") @@ -207,8 +207,9 @@ class PlaybookExecutor: return result - def _cleanup(self, signum=None, framenum=None): - return self._tqm.cleanup() + def _terminate(self, signum=None, framenum=None): + display.debug(framenum) + raise SystemExit("Terminating run due to external signal") def _get_serialized_batches(self, play): ''' diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py index 13c91b3ba77..bb4c0dd0a39 100644 --- a/lib/ansible/executor/process/result.py +++ b/lib/ansible/executor/process/result.py @@ -163,7 +163,7 @@ class ResultProcess(multiprocessing.Process): except queue.Empty: pass - except (KeyboardInterrupt, IOError, EOFError): + except (KeyboardInterrupt, SystemExit, IOError, EOFError): break except: # TODO: we should probably send a proper callback here instead of diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 120bd8b1414..24b9b3e5e03 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -132,7 +132,7 @@ class WorkerProcess(multiprocessing.Process): self._rslt_q.put(task_result, block=False) except Exception as e: - if not isinstance(e, (IOError, EOFError, KeyboardInterrupt)) or isinstance(e, TemplateNotFound): + if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): try: self._host.vars = dict() self._host.groups = [] @@ -140,7 +140,7 @@ class WorkerProcess(multiprocessing.Process): self._rslt_q.put(task_result, block=False) except: debug(u"WORKER EXCEPTION: %s" % to_unicode(e)) - debug(u"WORKER EXCEPTION: %s" % to_unicode(traceback.format_exc())) + debug(u"WORKER TRACEBACK: %s" % to_unicode(traceback.format_exc())) debug("WORKER PROCESS EXITING") diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index feb0ab526f0..bed9879c421 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -253,7 +253,10 @@ class TaskQueueManager: rslt_q.close() main_q.close() if worker_prc and worker_prc.is_alive(): - worker_prc.terminate() + try: + worker_prc.terminate() + except AttributeError: + pass def clear_failed_hosts(self): self._failed_hosts = dict()