diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 6cf3a968..cb130abc 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -114,14 +114,17 @@ def get_worker_model(): return _worker_model -def get_classic_worker_model(): +def get_classic_worker_model(**kwargs): """ Return the single :class:`ClassicWorkerModel` instance, constructing it if necessary. """ global _classic_worker_model + assert _classic_worker_model is None or (not kwargs), \ + "ClassicWorkerModel kwargs supplied but model already constructed" + if _classic_worker_model is None: - _classic_worker_model = ClassicWorkerModel() + _classic_worker_model = ClassicWorkerModel(**kwargs) return _classic_worker_model @@ -417,9 +420,9 @@ class ClassicWorkerModel(WorkerModel): For testing, if :data:`False`, don't initialize logging. """ common_setup(_init_logging=self._init_logging) + MuxProcess.cls_parent_sock, \ MuxProcess.cls_child_sock = socket.socketpair() - mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno()) mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno()) @@ -434,6 +437,28 @@ class ClassicWorkerModel(WorkerModel): MuxProcess.cls_child_sock.close() MuxProcess.cls_child_sock = None + def _test_reset(self): + """ + Used to clean up in unit tests. + """ + # TODO: split this up a bit. + global _classic_worker_model + assert MuxProcess.cls_parent_sock is not None + MuxProcess.cls_parent_sock.close() + MuxProcess.cls_parent_sock = None + self.listener_path = None + self.router = None + self.parent = None + + for mux in self._muxes: + pid, status = os.waitpid(mux.pid, 0) + status = mitogen.fork._convert_exit_status(status) + LOG.debug('mux PID %d %s', pid, + mitogen.parent.returncode_to_str(status)) + + _classic_worker_model = None + set_worker_model(None) + def on_strategy_start(self): """ See WorkerModel.on_strategy_start(). @@ -461,22 +486,26 @@ class ClassicWorkerModel(WorkerModel): return ClassicBinding(self) def on_binding_close(self): - if self.broker: - self.broker.shutdown() - self.broker.join() - self.router = None - self.broker = None + if not self.broker: + return + + self.broker.shutdown() + self.broker.join() + self.router = None + self.broker = None + self.listener_path = None + self.initialized = False - # #420: Ansible executes "meta" actions in the top-level process, - # meaning "reset_connection" will cause :class:`mitogen.core.Latch` - # FDs to be cached and erroneously shared by children on subsequent - # WorkerProcess forks. To handle that, call on_fork() to ensure any - # shared state is discarded. - # #490: only attempt to clean up when it's known that some - # resources exist to cleanup, otherwise later __del__ double-call - # to close() due to GC at random moment may obliterate an unrelated - # Connection's related resources. - mitogen.fork.on_fork() + # #420: Ansible executes "meta" actions in the top-level process, + # meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs + # to be cached and erroneously shared by children on subsequent + # WorkerProcess forks. To handle that, call on_fork() to ensure any + # shared state is discarded. + # #490: only attempt to clean up when it's known that some resources + # exist to cleanup, otherwise later __del__ double-call to close() due + # to GC at random moment may obliterate an unrelated Connection's + # related resources. + mitogen.fork.on_fork() class MuxProcess(object): @@ -514,28 +543,14 @@ class MuxProcess(object): #: applied to locally executed commands and modules. cls_original_env = None - #: In both processes, this a list of the temporary UNIX sockets used for - #: forked WorkerProcesses to contact the forked mux processes. - cls_listener_paths = None - - @classmethod - def _reset(cls): - """ - Used to clean up in unit tests. - """ - assert cls.worker_sock is not None - cls.worker_sock.close() - cls.worker_sock = None - os.waitpid(cls.worker_pid, 0) - def __init__(self, index): self.index = index #: Individual path of this process. self.path = mitogen.unix.make_socket_path() def start(self): - pid = os.fork() - if pid: + self.pid = os.fork() + if self.pid: # Wait for child to boot before continuing. mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1) return diff --git a/mitogen/fork.py b/mitogen/fork.py index e2075fc3..ee990262 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -121,6 +121,19 @@ def handle_child_crash(): os._exit(1) +def _convert_exit_status(status): + """ + Convert a :func:`os.waitpid`-style exit status to a :mod:`subprocess` style + exit status. + """ + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + return -os.WTERMSIG(status) + elif os.WIFSTOPPED(status): + return -os.WSTOPSIG(status) + + class Process(mitogen.parent.Process): def poll(self): try: @@ -134,12 +147,7 @@ class Process(mitogen.parent.Process): if not pid: return - if os.WIFEXITED(status): - return os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - return -os.WTERMSIG(status) - elif os.WIFSTOPPED(status): - return -os.WSTOPSIG(status) + return _convert_exit_status(status) class Options(mitogen.parent.Options): diff --git a/tests/ansible/tests/connection_test.py b/tests/ansible/tests/connection_test.py index d663ecc5..73958185 100644 --- a/tests/ansible/tests/connection_test.py +++ b/tests/ansible/tests/connection_test.py @@ -26,13 +26,17 @@ class MuxProcessMixin(object): @classmethod def setUpClass(cls): #mitogen.utils.log_to_file() - ansible_mitogen.process.MuxProcess.start(_init_logging=False) + cls.model = ansible_mitogen.process.get_classic_worker_model( + _init_logging=False + ) + ansible_mitogen.process.set_worker_model(cls.model) + cls.model.on_strategy_start() super(MuxProcessMixin, cls).setUpClass() @classmethod def tearDownClass(cls): + cls.model._test_reset() super(MuxProcessMixin, cls).tearDownClass() - ansible_mitogen.process.MuxProcess._reset() class ConnectionMixin(MuxProcessMixin):