[linear2] simplify ClassicWorkerModel and fix repeat initialization

"self.initialized = False" slipped in a few days ago, on second thoughts
that flag is not needed at all, by simply rearranging ClassicWorkerModel
to have a regular constructor.

This hierarchy is still squishy, it needs more love. Remaining
MuxProcess class attributes should eliminated.
pull/612/head
David Wilson 5 years ago
parent cebccf6f41
commit 3ceac2c9ed

@ -399,6 +399,18 @@ class ClassicBinding(Binding):
class ClassicWorkerModel(WorkerModel):
#: In the top-level process, this references one end of a socketpair(),
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`_on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
parent_sock = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
child_sock = None
#: mitogen.master.Router for this worker.
router = None
@ -414,8 +426,34 @@ class ClassicWorkerModel(WorkerModel):
parent = None
def __init__(self, _init_logging=True):
self._init_logging = _init_logging
self.initialized = False
"""
Arrange for classic model multiplexers to be started, if they are not
already running.
The parent process picks a UNIX socket path each child will use prior
to fork, creates a socketpair used essentially as a semaphore, then
blocks waiting for the child to indicate the UNIX socket is ready for
use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
common_setup(_init_logging=_init_logging)
self.parent_sock, self.child_sock = socket.socketpair()
mitogen.core.set_cloexec(self.parent_sock.fileno())
mitogen.core.set_cloexec(self.child_sock.fileno())
self._muxes = [
MuxProcess(self, index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self._on_process_exit, self.parent_sock)
self.child_sock.close()
self.child_sock = None
def _listener_for_name(self, name):
"""
@ -449,7 +487,7 @@ class ClassicWorkerModel(WorkerModel):
self.listener_path = path
def on_process_exit(self, sock):
def _on_process_exit(self, sock):
"""
This is an :mod:`atexit` handler installed in the top-level process.
@ -467,7 +505,7 @@ class ClassicWorkerModel(WorkerModel):
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('on_process_exit: ignoring duplicate call')
LOG.debug('_on_process_exit: ignoring duplicate call')
return
mitogen.core.io_op(sock.recv, 1)
@ -479,46 +517,15 @@ class ClassicWorkerModel(WorkerModel):
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
mitogen.parent.returncode_to_str(status))
def _initialize(self):
"""
Arrange for classic model multiplexers to be started, if they are not
already running.
The parent process picks a UNIX socket path each child will use prior
to fork, creates a socketpair used essentially as a semaphore, then
blocks waiting for the child to indicate the UNIX socket is ready for
use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
common_setup(_init_logging=self._init_logging)
MuxProcess.cls_parent_sock, \
MuxProcess.cls_child_sock = socket.socketpair()
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
self._muxes = [
MuxProcess(index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock)
MuxProcess.cls_child_sock.close()
MuxProcess.cls_child_sock = None
def _test_reset(self):
"""
Used to clean up in unit tests.
"""
# TODO: split this up a bit.
global _classic_worker_model
assert MuxProcess.cls_parent_sock is not None
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
assert self.parent_sock is not None
self.parent_sock.close()
self.parent_sock = None
self.listener_path = None
self.router = None
self.parent = None
@ -536,9 +543,6 @@ class ClassicWorkerModel(WorkerModel):
"""
See WorkerModel.on_strategy_start().
"""
if not self.initialized:
self._initialize()
self.initialized = True
def on_strategy_complete(self):
"""
@ -567,7 +571,6 @@ class ClassicWorkerModel(WorkerModel):
self.router = None
self.broker = None
self.listener_path = None
self.initialized = False
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
@ -598,25 +601,16 @@ class MuxProcess(object):
See https://bugs.python.org/issue6721 for a thorough description of the
class of problems this worker is intended to avoid.
"""
#: In the top-level process, this references one end of a socketpair(),
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
cls_parent_sock = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
cls_child_sock = None
#: A copy of :data:`os.environ` at the time the multiplexer process was
#: started. It's used by mitogen_local.py to find changes made to the
#: top-level environment (e.g. vars plugins -- issue #297) that must be
#: applied to locally executed commands and modules.
cls_original_env = None
def __init__(self, index):
def __init__(self, model, index):
#: :class:`ClassicWorkerModel` instance we were created by.
self.model = model
#: MuxProcess CPU index.
self.index = index
#: Individual path of this process.
self.path = mitogen.unix.make_socket_path()
@ -625,7 +619,7 @@ class MuxProcess(object):
self.pid = os.fork()
if self.pid:
# Wait for child to boot before continuing.
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
mitogen.core.io_op(self.model.parent_sock.recv, 1)
return
ansible_mitogen.logging.set_process_name('mux:' + str(self.index))
@ -635,8 +629,8 @@ class MuxProcess(object):
os.path.basename(self.path),
))
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
self.model.parent_sock.close()
self.model.parent_sock = None
try:
try:
self.worker_main()
@ -660,9 +654,9 @@ class MuxProcess(object):
try:
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.cls_child_sock.send, b('1'))
mitogen.core.io_op(self.model.child_sock.send, b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.cls_child_sock.recv, 1)
mitogen.core.io_op(self.model.child_sock.recv, 1)
finally:
self.broker.shutdown()
self.broker.join()

Loading…
Cancel
Save