[linear2] fix MuxProcess test fixture and some merge fallout

pull/607/head
David Wilson 5 years ago
parent e93762b3db
commit 1fca0b7a94

@ -114,14 +114,17 @@ def get_worker_model():
return _worker_model return _worker_model
def get_classic_worker_model(): def get_classic_worker_model(**kwargs):
""" """
Return the single :class:`ClassicWorkerModel` instance, constructing it if Return the single :class:`ClassicWorkerModel` instance, constructing it if
necessary. necessary.
""" """
global _classic_worker_model global _classic_worker_model
assert _classic_worker_model is None or (not kwargs), \
"ClassicWorkerModel kwargs supplied but model already constructed"
if _classic_worker_model is None: if _classic_worker_model is None:
_classic_worker_model = ClassicWorkerModel() _classic_worker_model = ClassicWorkerModel(**kwargs)
return _classic_worker_model return _classic_worker_model
@ -417,9 +420,9 @@ class ClassicWorkerModel(WorkerModel):
For testing, if :data:`False`, don't initialize logging. For testing, if :data:`False`, don't initialize logging.
""" """
common_setup(_init_logging=self._init_logging) common_setup(_init_logging=self._init_logging)
MuxProcess.cls_parent_sock, \ MuxProcess.cls_parent_sock, \
MuxProcess.cls_child_sock = socket.socketpair() MuxProcess.cls_child_sock = socket.socketpair()
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno()) mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno()) mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
@ -434,6 +437,28 @@ class ClassicWorkerModel(WorkerModel):
MuxProcess.cls_child_sock.close() MuxProcess.cls_child_sock.close()
MuxProcess.cls_child_sock = None MuxProcess.cls_child_sock = None
def _test_reset(self):
"""
Used to clean up in unit tests.
"""
# TODO: split this up a bit.
global _classic_worker_model
assert MuxProcess.cls_parent_sock is not None
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
self.listener_path = None
self.router = None
self.parent = None
for mux in self._muxes:
pid, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux PID %d %s', pid,
mitogen.parent.returncode_to_str(status))
_classic_worker_model = None
set_worker_model(None)
def on_strategy_start(self): def on_strategy_start(self):
""" """
See WorkerModel.on_strategy_start(). See WorkerModel.on_strategy_start().
@ -461,22 +486,26 @@ class ClassicWorkerModel(WorkerModel):
return ClassicBinding(self) return ClassicBinding(self)
def on_binding_close(self): def on_binding_close(self):
if self.broker: if not self.broker:
self.broker.shutdown() return
self.broker.join()
self.router = None self.broker.shutdown()
self.broker = None self.broker.join()
self.router = None
self.broker = None
self.listener_path = None
self.initialized = False
# #420: Ansible executes "meta" actions in the top-level process, # #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch` # meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
# FDs to be cached and erroneously shared by children on subsequent # to be cached and erroneously shared by children on subsequent
# WorkerProcess forks. To handle that, call on_fork() to ensure any # WorkerProcess forks. To handle that, call on_fork() to ensure any
# shared state is discarded. # shared state is discarded.
# #490: only attempt to clean up when it's known that some # #490: only attempt to clean up when it's known that some resources
# resources exist to cleanup, otherwise later __del__ double-call # exist to cleanup, otherwise later __del__ double-call to close() due
# to close() due to GC at random moment may obliterate an unrelated # to GC at random moment may obliterate an unrelated Connection's
# Connection's related resources. # related resources.
mitogen.fork.on_fork() mitogen.fork.on_fork()
class MuxProcess(object): class MuxProcess(object):
@ -514,28 +543,14 @@ class MuxProcess(object):
#: applied to locally executed commands and modules. #: applied to locally executed commands and modules.
cls_original_env = None cls_original_env = None
#: In both processes, this a list of the temporary UNIX sockets used for
#: forked WorkerProcesses to contact the forked mux processes.
cls_listener_paths = None
@classmethod
def _reset(cls):
"""
Used to clean up in unit tests.
"""
assert cls.worker_sock is not None
cls.worker_sock.close()
cls.worker_sock = None
os.waitpid(cls.worker_pid, 0)
def __init__(self, index): def __init__(self, index):
self.index = index self.index = index
#: Individual path of this process. #: Individual path of this process.
self.path = mitogen.unix.make_socket_path() self.path = mitogen.unix.make_socket_path()
def start(self): def start(self):
pid = os.fork() self.pid = os.fork()
if pid: if self.pid:
# Wait for child to boot before continuing. # Wait for child to boot before continuing.
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1) mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
return return

@ -121,6 +121,19 @@ def handle_child_crash():
os._exit(1) os._exit(1)
def _convert_exit_status(status):
"""
Convert a :func:`os.waitpid`-style exit status to a :mod:`subprocess` style
exit status.
"""
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Process(mitogen.parent.Process): class Process(mitogen.parent.Process):
def poll(self): def poll(self):
try: try:
@ -134,12 +147,7 @@ class Process(mitogen.parent.Process):
if not pid: if not pid:
return return
if os.WIFEXITED(status): return _convert_exit_status(status)
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Options(mitogen.parent.Options): class Options(mitogen.parent.Options):

@ -26,13 +26,17 @@ class MuxProcessMixin(object):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
#mitogen.utils.log_to_file() #mitogen.utils.log_to_file()
ansible_mitogen.process.MuxProcess.start(_init_logging=False) cls.model = ansible_mitogen.process.get_classic_worker_model(
_init_logging=False
)
ansible_mitogen.process.set_worker_model(cls.model)
cls.model.on_strategy_start()
super(MuxProcessMixin, cls).setUpClass() super(MuxProcessMixin, cls).setUpClass()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.model._test_reset()
super(MuxProcessMixin, cls).tearDownClass() super(MuxProcessMixin, cls).tearDownClass()
ansible_mitogen.process.MuxProcess._reset()
class ConnectionMixin(MuxProcessMixin): class ConnectionMixin(MuxProcessMixin):

Loading…
Cancel
Save