diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 4d3a9b5e..2b29904e 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -544,23 +544,12 @@ class ClassicWorkerModel(WorkerModel): """ Used to clean up in unit tests. """ - # TODO: split this up a bit. - global _classic_worker_model - assert self.parent_sock is not None - self.parent_sock.close() - self.parent_sock = None - self.listener_path = None - self.router = None - self.parent = None - - for mux in self._muxes: - pid, status = os.waitpid(mux.pid, 0) - status = mitogen.fork._convert_exit_status(status) - LOG.debug('mux PID %d %s', pid, - mitogen.parent.returncode_to_str(status)) + self.on_binding_close() + self._on_process_exit() + set_worker_model(None) + global _classic_worker_model _classic_worker_model = None - set_worker_model(None) def on_strategy_start(self): """ @@ -593,6 +582,7 @@ class ClassicWorkerModel(WorkerModel): self.broker.join() self.router = None self.broker = None + self.parent = None self.listener_path = None # #420: Ansible executes "meta" actions in the top-level process, @@ -708,8 +698,8 @@ class MuxProcess(object): max_message_size=4096 * 1048576, ) _setup_responder(self.router.responder) - mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown) - mitogen.core.listen(self.broker, 'exit', self.on_broker_exit) + mitogen.core.listen(self.broker, 'shutdown', self._on_broker_shutdown) + mitogen.core.listen(self.broker, 'exit', self._on_broker_exit) self.listener = mitogen.unix.Listener.build_stream( router=self.router, path=self.path, @@ -729,26 +719,20 @@ class MuxProcess(object): ) setup_pool(self.pool) - def on_broker_shutdown(self): + def _on_broker_shutdown(self): """ - Respond to broker shutdown by beginning service pool shutdown. Do not - join on the pool yet, since that would block the broker thread which - then cannot clean up pending handlers, which is required for the - threads to exit gracefully. + Respond to broker shutdown by shutting down the pool. Do not join on it + yet, since that would block the broker thread which then cannot clean + up pending handlers and connections, which is required for the threads + to exit gracefully. """ - # In normal operation we presently kill the process because there is - # not yet any way to cancel connect(). - self.pool.stop(join=self.profiling) + self.pool.stop(join=False) - def on_broker_exit(self): + def _on_broker_exit(self): """ - Respond to the broker thread about to exit by sending SIGTERM to - ourself. In future this should gracefully join the pool, but TERM is - fine for now. + Respond to the broker thread about to exit by finally joining on the + pool. This is safe since pools only block in connection attempts, and + connection attempts fail with CancelledError when broker shutdown + begins. """ - if not os.environ.get('MITOGEN_PROFILING'): - # In normal operation we presently kill the process because there is - # not yet any way to cancel connect(). When profiling, threads - # including the broker must shut down gracefully, otherwise pstats - # won't be written. - os.kill(os.getpid(), signal.SIGTERM) + self.pool.join()