ansible: abstract worker process model.

Move all details of broker/router setup out of connection.py, instead
deferring it to a WorkerModel class exported by process.py via
get_worker_model(). The running strategy can override the configured
worker model via _get_worker_model().

ClassicWorkerModel is installed by default, which implements the
extension's existing process model.

Add optional support for the third party setproctitle module, so
children have pretty names in ps output.

Add optional support for per-CPU multiplexers to classic runs.
pull/607/head
David Wilson 6 years ago
parent 6b8a7cbcc4
commit 9035884c77

@ -38,6 +38,7 @@ import sys
import time
import jinja2.runtime
from ansible.module_utils import six
import ansible.constants as C
import ansible.errors
import ansible.plugins.connection
@ -459,15 +460,10 @@ class CallChain(mitogen.parent.CallChain):
class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Broker for this worker.
broker = None
#: mitogen.master.Router for this worker.
router = None
#: mitogen.parent.Context representing the parent Context, which is
#: presently always the connection multiplexer process.
parent = None
#: The :class:`ansible_mitogen.process.Binding` representing the connection
#: multiplexer this connection's target is assigned to. :data:`None` when
#: disconnected.
binding = None
#: mitogen.parent.Context for the target account on the target, possibly
#: reached via become.
@ -518,13 +514,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: matching vanilla Ansible behaviour.
loader_basedir = None
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'Mitogen connection types may only be instantiated '
'while the "mitogen" strategy is active.'
)
super(Connection, self).__init__(play_context, new_stdin)
def __del__(self):
"""
Ansible cannot be trusted to always call close() e.g. the synchronize
@ -585,6 +574,15 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
return self.init_child_result['home_dir']
def get_binding(self):
"""
Return the :class:`ansible_mitogen.process.Binding` representing the
process that hosts the physical connection and services (context
establishment, file transfer, ..) for our desired target.
"""
assert self.binding is not None
return self.binding
@property
def connected(self):
return self.context is not None
@ -672,18 +670,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return stack
def _connect_broker(self):
"""
Establish a reference to the Broker, Router and parent context used for
connections.
"""
if not self.broker:
self.broker = mitogen.master.Broker()
self.router, self.parent = mitogen.unix.connect(
path=ansible_mitogen.process.MuxProcess.unix_listener_path,
broker=self.broker,
)
def _build_stack(self):
"""
Construct a list of dictionaries representing the connection
@ -691,14 +677,14 @@ class Connection(ansible.plugins.connection.ConnectionBase):
additionally used by the integration tests "mitogen_get_stack" action
to fetch the would-be connection configuration.
"""
return self._stack_from_spec(
ansible_mitogen.transport_config.PlayContextSpec(
connection=self,
play_context=self._play_context,
transport=self.transport,
inventory_name=self.inventory_hostname,
)
spec = ansible_mitogen.transport_config.PlayContextSpec(
connection=self,
play_context=self._play_context,
transport=self.transport,
inventory_name=self.inventory_hostname,
)
stack = self._stack_from_spec(spec)
return spec.inventory_name(), stack
def _connect_stack(self, stack):
"""
@ -711,7 +697,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
description of the returned dictionary.
"""
try:
dct = self.parent.call_service(
dct = mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService',
method_name='get',
stack=mitogen.utils.cast(list(stack)),
@ -758,8 +745,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if self.connected:
return
self._connect_broker()
stack = self._build_stack()
inventory_name, stack = self._build_stack()
worker_model = ansible_mitogen.process.get_worker_model()
self.binding = worker_model.get_binding(inventory_name)
self._connect_stack(stack)
def _mitogen_reset(self, mode):
@ -776,9 +764,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return
self.chain.reset()
self.parent.call_service(
mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService',
method_name=mode,
method_name='put',
context=self.context
)
@ -787,27 +776,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.init_child_result = None
self.chain = None
def _shutdown_broker(self):
"""
Shutdown the broker thread during :meth:`close` or :meth:`reset`.
"""
if self.broker:
self.broker.shutdown()
self.broker.join()
self.broker = None
self.router = None
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch`
# FDs to be cached and erroneously shared by children on subsequent
# WorkerProcess forks. To handle that, call on_fork() to ensure any
# shared state is discarded.
# #490: only attempt to clean up when it's known that some
# resources exist to cleanup, otherwise later __del__ double-call
# to close() due to GC at random moment may obliterate an unrelated
# Connection's resources.
mitogen.fork.on_fork()
def close(self):
"""
Arrange for the mitogen.master.Router running in the worker to
@ -815,7 +783,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times.
"""
self._mitogen_reset(mode='put')
self._shutdown_broker()
if self.binding:
self.binding.close()
self.binding = None
def _reset_find_task_vars(self):
"""
@ -853,7 +823,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
self._mitogen_reset(mode='reset')
self._shutdown_broker()
self.binding.close()
self.binding = None
# Compatibility with Ansible 2.4 wait_for_connection plug-in.
_reset = reset
@ -1024,7 +995,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
utimes=(st.st_atime, st.st_mtime))
self._connect()
self.parent.call_service(
mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='mitogen.service.FileService',
method_name='register',
path=mitogen.utils.cast(in_path)
@ -1036,7 +1008,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
# file alive, but that requires more work.
self.get_chain().call(
ansible_mitogen.target.transfer_file,
context=self.parent,
context=self.binding.get_child_service_context(),
in_path=in_path,
out_path=out_path
)

@ -148,6 +148,8 @@ class Planner(object):
# named by `runner_name`.
}
"""
binding = self._inv.connection.get_binding()
new = dict((mitogen.core.UnicodeType(k), kwargs[k])
for k in kwargs)
new.setdefault('good_temp_dir',
@ -155,7 +157,7 @@ class Planner(object):
new.setdefault('cwd', self._inv.connection.get_default_cwd())
new.setdefault('extra_env', self._inv.connection.get_default_env())
new.setdefault('emulate_tty', True)
new.setdefault('service_context', self._inv.connection.parent)
new.setdefault('service_context', binding.get_child_service_context())
return new
def __repr__(self):
@ -328,7 +330,9 @@ class NewStylePlanner(ScriptPlanner):
def get_module_map(self):
if self._module_map is None:
self._module_map = self._inv.connection.parent.call_service(
binding = self._inv.connection.get_binding()
self._module_map = mitogen.service.call(
call_context=binding.get_service_context(),
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
@ -405,9 +409,12 @@ def get_module_data(name):
def _propagate_deps(invocation, planner, context):
invocation.connection.parent.call_service(
binding = invocation.connection.get_binding()
mitogen.service.call(
call_context=binding.get_service_context(),
service_name='mitogen.service.PushFileService',
method_name='propagate_paths_and_modules',
context=context,
paths=planner.get_push_files(),
modules=planner.get_module_deps(),

@ -81,6 +81,6 @@ class Connection(ansible_mitogen.connection.Connection):
from WorkerProcess, we must emulate that.
"""
return dict_diff(
old=ansible_mitogen.process.MuxProcess.original_env,
old=ansible_mitogen.process.MuxProcess.cls_original_env,
new=os.environ,
)

@ -30,6 +30,7 @@ from __future__ import absolute_import
import atexit
import errno
import logging
import multiprocessing
import os
import signal
import socket
@ -41,9 +42,15 @@ try:
except ImportError:
faulthandler = None
try:
import setproctitle
except ImportError:
setproctitle = None
import mitogen
import mitogen.core
import mitogen.debug
import mitogen.fork
import mitogen.master
import mitogen.parent
import mitogen.service
@ -52,6 +59,7 @@ import mitogen.utils
import ansible
import ansible.constants as C
import ansible.errors
import ansible_mitogen.logging
import ansible_mitogen.services
@ -66,28 +74,55 @@ ANSIBLE_PKG_OVERRIDE = (
u"__author__ = %r\n"
)
worker_model_msg = (
'Mitogen connection types may only be instantiated when one of the '
'"mitogen_*" or "operon_*" strategies are active.'
)
#: The worker model as configured by the currently running strategy. This is
#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by
#: :class:`StrategyMixin`.
_worker_model = None
#: A copy of the sole :class:`ClassicWorkerModel` that ever exists during a
#: classic run, as return by :func:`get_classic_worker_model`.
_classic_worker_model = None
def set_worker_model(model):
"""
To remove process model-wiring from
:class:`ansible_mitogen.connection.Connection`, it is necessary to track
some idea of the configured execution environment outside the connection
plug-in.
def clean_shutdown(sock):
That is what :func:`set_worker_model` and :func:`get_worker_model` are for.
"""
Shut the write end of `sock`, causing `recv` in the worker process to wake
up with a 0-byte read and initiate mux process exit, then wait for a 0-byte
read from the read end, which will occur after the the child closes the
descriptor on exit.
This is done using :mod:`atexit` since Ansible lacks any more sensible hook
to run code during exit, and unless some synchronization exists with
MuxProcess, debug logs may appear on the user's terminal *after* the prompt
has been printed.
global _worker_model
assert model is None or _worker_model is None
_worker_model = model
def get_worker_model():
"""
try:
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('clean_shutdown: ignoring duplicate call')
return
Return the :class:`WorkerModel` currently configured by the running
strategy.
"""
if _worker_model is None:
raise ansible.errors.AnsibleConnectionFailure(worker_model_msg)
return _worker_model
sock.recv(1)
sock.close()
def get_classic_worker_model():
"""
Return the single :class:`ClassicWorkerModel` instance, constructing it if
necessary.
"""
global _classic_worker_model
if _classic_worker_model is None:
_classic_worker_model = ClassicWorkerModel()
return _classic_worker_model
def getenv_int(key, default=0):
@ -119,6 +154,330 @@ def save_pid(name):
fp.write(str(os.getpid()))
def setup_pool(pool):
"""
Configure a connection multiplexer's :class:`mitogen.service.Pool` with
services accessed by clients and WorkerProcesses.
"""
pool.add(mitogen.service.FileService(router=pool.router))
pool.add(mitogen.service.PushFileService(router=pool.router))
pool.add(ansible_mitogen.services.ContextService(router=pool.router))
pool.add(ansible_mitogen.services.ModuleDepService(pool.router))
LOG.debug('Service pool configured: size=%d', pool.size)
def _setup_simplejson(responder):
"""
We support serving simplejson for Python 2.4 targets on Ansible 2.3, at
least so the package's own CI Docker scripts can run without external
help, however newer versions of simplejson no longer support Python
2.4. Therefore override any installed/loaded version with a
2.4-compatible version we ship in the compat/ directory.
"""
responder.whitelist_prefix('simplejson')
# issue #536: must be at end of sys.path, in case existing newer
# version is already loaded.
compat_path = os.path.join(os.path.dirname(__file__), 'compat')
sys.path.append(compat_path)
for fullname, is_pkg, suffix in (
(u'simplejson', True, '__init__.py'),
(u'simplejson.decoder', False, 'decoder.py'),
(u'simplejson.encoder', False, 'encoder.py'),
(u'simplejson.scanner', False, 'scanner.py'),
):
path = os.path.join(compat_path, 'simplejson', suffix)
fp = open(path, 'rb')
try:
source = fp.read()
finally:
fp.close()
responder.add_source_override(
fullname=fullname,
path=path,
source=source,
is_pkg=is_pkg,
)
def _setup_responder(responder):
"""
Configure :class:`mitogen.master.ModuleResponder` to only permit
certain packages, and to generate custom responses for certain modules.
"""
responder.whitelist_prefix('ansible')
responder.whitelist_prefix('ansible_mitogen')
_setup_simplejson(responder)
# Ansible 2.3 is compatible with Python 2.4 targets, however
# ansible/__init__.py is not. Instead, executor/module_common.py writes
# out a 2.4-compatible namespace package for unknown reasons. So we
# copy it here.
responder.add_source_override(
fullname='ansible',
path=ansible.__file__,
source=(ANSIBLE_PKG_OVERRIDE % (
ansible.__version__,
ansible.__author__,
)).encode(),
is_pkg=True,
)
def common_setup(_init_logging=True):
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
ansible_mitogen.affinity.policy.assign_controller()
mitogen.utils.setup_gil()
if _init_logging:
ansible_mitogen.logging.setup()
if faulthandler is not None:
faulthandler.enable()
MuxProcess.profiling = getenv_int('MITOGEN_PROFILING') > 0
if MuxProcess.profiling:
mitogen.core.enable_profiling()
MuxProcess.cls_original_env = dict(os.environ)
def get_cpu_count(default=None):
"""
Get the multiplexer CPU count from the MITOGEN_CPU_COUNT environment
variable, returning `default` if one isn't set, or is out of range.
:param int default:
Default CPU, or :data:`None` to use all available CPUs.
"""
max_cpus = multiprocessing.cpu_count()
if default is None:
default = max_cpus
cpu_count = getenv_int('MITOGEN_CPU_COUNT', default=default)
if cpu_count < 1 or cpu_count > max_cpus:
cpu_count = default
return cpu_count
class Binding(object):
def get_child_service_context(self):
"""
Return the :class:`mitogen.core.Context` to which children should
direct ContextService requests, or :data:`None` for the local process.
"""
raise NotImplementedError()
def get_service_context(self):
"""
Return the :class:`mitogen.core.Context` to which this process should
direct ContextService requests, or :data:`None` for the local process.
"""
raise NotImplementedError()
def close(self):
"""
Finalize any associated resources.
"""
raise NotImplementedError()
class WorkerModel(object):
def on_strategy_start(self):
"""
Called prior to strategy start in the top-level process. Responsible
for preparing any worker/connection multiplexer state.
"""
raise NotImplementedError()
def on_strategy_complete(self):
"""
Called after strategy completion in the top-level process. Must place
Ansible back in a "compatible" state where any other strategy plug-in
may execute.
"""
raise NotImplementedError()
def get_binding(self, inventory_name):
raise NotImplementedError()
class ClassicBinding(Binding):
"""
Only one connection may be active at a time in a classic worker, so its
binding just provides forwarders back to :class:`ClassicWorkerModel`.
"""
def __init__(self, model):
self.model = model
def get_service_context(self):
"""
See Binding.get_service_context().
"""
return self.model.parent
def get_child_service_context(self):
"""
See Binding.get_child_service_context().
"""
return self.model.parent
def close(self):
"""
See Binding.close().
"""
self.model.on_binding_close()
class ClassicWorkerModel(WorkerModel):
#: mitogen.master.Router for this worker.
router = None
#: mitogen.master.Broker for this worker.
broker = None
#: Name of multiplexer process socket we are currently connected to.
listener_path = None
#: mitogen.parent.Context representing the parent Context, which is the
#: connection multiplexer process when running in classic mode, or the
#: top-level process when running a new-style mode.
parent = None
def __init__(self, _init_logging=True):
self._init_logging = _init_logging
self.initialized = False
def _listener_for_name(self, name):
"""
Given a connection stack, return the UNIX listener that should be used
to communicate with it. This is a simple hash of the inventory name.
"""
if len(self._muxes) == 1:
return self._muxes[0].path
idx = abs(hash(name)) % len(self._muxes)
LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path)
return self._muxes[idx].path
def _reconnect(self, path):
if self.router is not None:
# Router can just be overwritten, but the previous parent
# connection must explicitly be removed from the broker first.
self.router.disconnect(self.parent)
self.parent = None
self.router = None
self.router, self.parent = mitogen.unix.connect(
path=path,
broker=self.broker,
)
self.listener_path = path
def on_process_exit(self, sock):
"""
This is an :mod:`atexit` handler installed in the top-level process.
Shut the write end of `sock`, causing the receive side of the socket in
every worker process to wake up with a 0-byte reads, and causing their
main threads to wake up and initiate shutdown. After shutting the
socket down, wait for a 0-byte read from the read end, which will occur
after the last child closes the descriptor on exit.
This is done using :mod:`atexit` since Ansible lacks any better hook to
run code during exit, and unless some synchronization exists with
MuxProcess, debug logs may appear on the user's terminal *after* the
prompt has been printed.
"""
try:
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('on_process_exit: ignoring duplicate call')
return
mitogen.core.io_op(sock.recv, 1)
sock.close()
def _initialize(self):
"""
Arrange for classic process model connection multiplexer child
processes to be started, if they are not already running.
The parent process picks a UNIX socket path the child will use prior to
fork, creates a socketpair used essentially as a semaphore, then blocks
waiting for the child to indicate the UNIX socket is ready for use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
common_setup(_init_logging=self._init_logging)
MuxProcess.cls_parent_sock, \
MuxProcess.cls_child_sock = socket.socketpair()
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
self._muxes = [
MuxProcess(index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock)
MuxProcess.cls_child_sock.close()
MuxProcess.cls_child_sock = None
def on_strategy_start(self):
"""
See WorkerModel.on_strategy_start().
"""
if not self.initialized:
self._initialize()
self.initialized = True
def on_strategy_complete(self):
"""
See WorkerModel.on_strategy_complete().
"""
def get_binding(self, inventory_name):
"""
See WorkerModel.get_binding().
"""
if self.broker is None:
self.broker = mitogen.master.Broker()
path = self._listener_for_name(inventory_name)
if path != self.listener_path:
self._reconnect(path)
return ClassicBinding(self)
def on_binding_close(self):
if self.broker:
self.broker.shutdown()
self.broker.join()
self.router = None
self.broker = None
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch`
# FDs to be cached and erroneously shared by children on subsequent
# WorkerProcess forks. To handle that, call on_fork() to ensure any
# shared state is discarded.
# #490: only attempt to clean up when it's known that some
# resources exist to cleanup, otherwise later __del__ double-call
# to close() due to GC at random moment may obliterate an unrelated
# Connection's related resources.
mitogen.fork.on_fork()
class MuxProcess(object):
"""
Implement a subprocess forked from the Ansible top-level, as a safe place
@ -136,30 +495,27 @@ class MuxProcess(object):
See https://bugs.python.org/issue6721 for a thorough description of the
class of problems this worker is intended to avoid.
"""
#: In the top-level process, this references one end of a socketpair(),
#: which the MuxProcess blocks reading from in order to determine when
#: the master process dies. Once the read returns, the MuxProcess will
#: begin shutting itself down.
worker_sock = None
#: In the worker process, this references the other end of
#: :py:attr:`worker_sock`.
child_sock = None
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
cls_parent_sock = None
#: In the top-level process, this is the PID of the single MuxProcess
#: that was spawned.
worker_pid = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
cls_child_sock = None
#: A copy of :data:`os.environ` at the time the multiplexer process was
#: started. It's used by mitogen_local.py to find changes made to the
#: top-level environment (e.g. vars plugins -- issue #297) that must be
#: applied to locally executed commands and modules.
original_env = None
cls_original_env = None
#: In both processes, this is the temporary UNIX socket used for
#: forked WorkerProcesses to contact the MuxProcess
unix_listener_path = None
#: In both processes, this a list of the temporary UNIX sockets used for
#: forked WorkerProcesses to contact the forked mux processes.
cls_listener_paths = None
@classmethod
def _reset(cls):
@ -171,69 +527,54 @@ class MuxProcess(object):
cls.worker_sock = None
os.waitpid(cls.worker_pid, 0)
@classmethod
def start(cls, _init_logging=True):
"""
Arrange for the subprocess to be started, if it is not already running.
def __init__(self, index):
self.index = index
#: Individual path of this process.
self.path = mitogen.unix.make_socket_path()
The parent process picks a UNIX socket path the child will use prior to
fork, creates a socketpair used essentially as a semaphore, then blocks
waiting for the child to indicate the UNIX socket is ready for use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
if cls.worker_sock is not None:
def start(self):
pid = os.fork()
if pid:
# Wait for child to boot before continuing.
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
return
if faulthandler is not None:
faulthandler.enable()
mitogen.utils.setup_gil()
cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
atexit.register(clean_shutdown, cls.worker_sock)
mitogen.core.set_cloexec(cls.worker_sock.fileno())
mitogen.core.set_cloexec(cls.child_sock.fileno())
cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None
if cls.profiling:
mitogen.core.enable_profiling()
if _init_logging:
ansible_mitogen.logging.setup()
cls.original_env = dict(os.environ)
cls.worker_pid = os.fork()
if cls.worker_pid:
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
ansible_mitogen.affinity.policy.assign_controller()
cls.child_sock.close()
cls.child_sock = None
mitogen.core.io_op(cls.worker_sock.recv, 1)
else:
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
cls.worker_sock.close()
cls.worker_sock = None
self = cls()
self.worker_main()
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux:' + str(self.index))
if setproctitle:
setproctitle.setproctitle('mitogen mux:%s (%s)' % (
self.index,
os.path.basename(self.path),
))
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
try:
try:
self.worker_main()
except Exception:
LOG.exception('worker_main() crashed')
finally:
sys.exit()
def worker_main(self):
"""
The main function of for the mux process: setup the Mitogen broker
thread and ansible_mitogen services, then sleep waiting for the socket
The main function of the mux process: setup the Mitogen broker thread
and ansible_mitogen services, then sleep waiting for the socket
connected to the parent to be closed (indicating the parent has died).
"""
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
self._setup_master()
self._setup_services()
try:
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.child_sock.send, b('1'))
mitogen.core.io_op(self.cls_child_sock.send, b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.child_sock.recv, 1)
mitogen.core.io_op(self.cls_child_sock.recv, 1)
finally:
self.broker.shutdown()
self.broker.join()
@ -252,64 +593,6 @@ class MuxProcess(object):
if secs:
mitogen.debug.dump_to_logger(secs=secs)
def _setup_simplejson(self, responder):
"""
We support serving simplejson for Python 2.4 targets on Ansible 2.3, at
least so the package's own CI Docker scripts can run without external
help, however newer versions of simplejson no longer support Python
2.4. Therefore override any installed/loaded version with a
2.4-compatible version we ship in the compat/ directory.
"""
responder.whitelist_prefix('simplejson')
# issue #536: must be at end of sys.path, in case existing newer
# version is already loaded.
compat_path = os.path.join(os.path.dirname(__file__), 'compat')
sys.path.append(compat_path)
for fullname, is_pkg, suffix in (
(u'simplejson', True, '__init__.py'),
(u'simplejson.decoder', False, 'decoder.py'),
(u'simplejson.encoder', False, 'encoder.py'),
(u'simplejson.scanner', False, 'scanner.py'),
):
path = os.path.join(compat_path, 'simplejson', suffix)
fp = open(path, 'rb')
try:
source = fp.read()
finally:
fp.close()
responder.add_source_override(
fullname=fullname,
path=path,
source=source,
is_pkg=is_pkg,
)
def _setup_responder(self, responder):
"""
Configure :class:`mitogen.master.ModuleResponder` to only permit
certain packages, and to generate custom responses for certain modules.
"""
responder.whitelist_prefix('ansible')
responder.whitelist_prefix('ansible_mitogen')
self._setup_simplejson(responder)
# Ansible 2.3 is compatible with Python 2.4 targets, however
# ansible/__init__.py is not. Instead, executor/module_common.py writes
# out a 2.4-compatible namespace package for unknown reasons. So we
# copy it here.
responder.add_source_override(
fullname='ansible',
path=ansible.__file__,
source=(ANSIBLE_PKG_OVERRIDE % (
ansible.__version__,
ansible.__author__,
)).encode(),
is_pkg=True,
)
def _setup_master(self):
"""
Construct a Router, Broker, and mitogen.unix listener
@ -319,12 +602,12 @@ class MuxProcess(object):
broker=self.broker,
max_message_size=4096 * 1048576,
)
self._setup_responder(self.router.responder)
_setup_responder(self.router.responder)
mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown)
mitogen.core.listen(self.broker, 'exit', self.on_broker_exit)
self.listener = mitogen.unix.Listener.build_stream(
router=self.router,
path=self.unix_listener_path,
path=self.path,
backlog=C.DEFAULT_FORKS,
)
self._enable_router_debug()
@ -337,15 +620,9 @@ class MuxProcess(object):
"""
self.pool = mitogen.service.Pool(
router=self.router,
services=[
mitogen.service.FileService(router=self.router),
mitogen.service.PushFileService(router=self.router),
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=getenv_int('MITOGEN_POOL_SIZE', default=32),
)
LOG.debug('Service pool configured: size=%d', self.pool.size)
setup_pool(self.pool)
def on_broker_shutdown(self):
"""
@ -364,7 +641,7 @@ class MuxProcess(object):
ourself. In future this should gracefully join the pool, but TERM is
fine for now.
"""
if not self.profiling:
if not os.environ.get('MITOGEN_PROFILING'):
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect(). When profiling, threads
# including the broker must shut down gracefully, otherwise pstats

@ -326,6 +326,7 @@ class ContextService(mitogen.service.Service):
)
def _send_module_forwards(self, context):
return
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None
@ -372,7 +373,7 @@ class ContextService(mitogen.service.Service):
try:
method = getattr(self.router, spec['method'])
except AttributeError:
raise Error('unsupported method: %(transport)s' % spec)
raise Error('unsupported method: %(method)s' % spec)
context = method(via=via, unidirectional=True, **spec['kwargs'])
if via and spec.get('enable_lru'):
@ -382,6 +383,7 @@ class ContextService(mitogen.service.Service):
mitogen.core.listen(context, 'disconnect',
lambda: self._on_context_disconnect(context))
#self._send_module_forwards(context) TODO
self._send_module_forwards(context)
init_child_result = context.call(
ansible_mitogen.target.init_child,
@ -443,7 +445,7 @@ class ContextService(mitogen.service.Service):
@mitogen.service.arg_spec({
'stack': list
})
def get(self, msg, stack):
def get(self, stack):
"""
Return a Context referring to an established connection with the given
configuration, establishing new connections as necessary.

@ -31,6 +31,11 @@ import os
import signal
import threading
try:
import setproctitle
except ImportError:
setproctitle = None
import mitogen.core
import ansible_mitogen.affinity
import ansible_mitogen.loaders
@ -145,11 +150,17 @@ def wrap_connection_loader__get(name, *args, **kwargs):
return connection_loader__get(name, *args, **kwargs)
def wrap_worker__run(*args, **kwargs):
def wrap_worker__run(self):
"""
While the strategy is active, rewrite connection_loader.get() calls for
some transports into requests for a compatible Mitogen transport.
"""
if setproctitle:
setproctitle.setproctitle('worker:%s task:%s' % (
self._host.name,
self._task.action,
))
# Ignore parent's attempts to murder us when we still need to write
# profiling output.
if mitogen.core._profile_hook.__name__ != '_profile_hook':
@ -158,10 +169,60 @@ def wrap_worker__run(*args, **kwargs):
ansible_mitogen.logging.set_process_name('task')
ansible_mitogen.affinity.policy.assign_worker()
return mitogen.core._profile_hook('WorkerProcess',
lambda: worker__run(*args, **kwargs)
lambda: worker__run(self)
)
class AnsibleWrappers(object):
"""
Manage add/removal of various Ansible runtime hooks.
"""
def _add_plugin_paths(self):
"""
Add the Mitogen plug-in directories to the ModuleLoader path, avoiding
the need for manual configuration.
"""
base_dir = os.path.join(os.path.dirname(__file__), 'plugins')
ansible_mitogen.loaders.connection_loader.add_directory(
os.path.join(base_dir, 'connection')
)
ansible_mitogen.loaders.action_loader.add_directory(
os.path.join(base_dir, 'action')
)
def _install_wrappers(self):
"""
Install our PluginLoader monkey patches and update global variables
with references to the real functions.
"""
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
worker__run = ansible.executor.process.worker.WorkerProcess.run
ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run
def _remove_wrappers(self):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible.executor.process.worker.WorkerProcess.run = worker__run
def install(self):
self._add_plugin_paths()
self._install_wrappers()
def remove(self):
self._remove_wrappers()
class StrategyMixin(object):
"""
This mix-in enhances any built-in strategy by arranging for various Mitogen
@ -223,43 +284,6 @@ class StrategyMixin(object):
remote process, all the heavy lifting of transferring the action module
and its dependencies are automatically handled by Mitogen.
"""
def _install_wrappers(self):
"""
Install our PluginLoader monkey patches and update global variables
with references to the real functions.
"""
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
worker__run = ansible.executor.process.worker.WorkerProcess.run
ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run
def _remove_wrappers(self):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible.executor.process.worker.WorkerProcess.run = worker__run
def _add_plugin_paths(self):
"""
Add the Mitogen plug-in directories to the ModuleLoader path, avoiding
the need for manual configuration.
"""
base_dir = os.path.join(os.path.dirname(__file__), 'plugins')
ansible_mitogen.loaders.connection_loader.add_directory(
os.path.join(base_dir, 'connection')
)
ansible_mitogen.loaders.action_loader.add_directory(
os.path.join(base_dir, 'action')
)
def _queue_task(self, host, task, task_vars, play_context):
"""
@ -290,20 +314,35 @@ class StrategyMixin(object):
play_context=play_context,
)
def _get_worker_model(self):
"""
In classic mode a single :class:`WorkerModel` exists, which manages
references and configuration of the associated connection multiplexer
process.
"""
return ansible_mitogen.process.get_classic_worker_model()
def run(self, iterator, play_context, result=0):
"""
Arrange for a mitogen.master.Router to be available for the duration of
the strategy's real run() method.
Wrap :meth:`run` to ensure requisite infrastructure and modifications
are configured for the duration of the call.
"""
_assert_supported_release()
ansible_mitogen.process.MuxProcess.start()
run = super(StrategyMixin, self).run
self._add_plugin_paths()
self._install_wrappers()
wrappers = AnsibleWrappers()
self._worker_model = self._get_worker_model()
ansible_mitogen.process.set_worker_model(self._worker_model)
try:
return mitogen.core._profile_hook('Strategy',
lambda: run(iterator, play_context)
)
self._worker_model.on_strategy_start()
try:
wrappers.install()
try:
run = super(StrategyMixin, self).run
return mitogen.core._profile_hook('Strategy',
lambda: run(iterator, play_context)
)
finally:
wrappers.remove()
finally:
self._worker_model.on_strategy_complete()
finally:
self._remove_wrappers()
ansible_mitogen.process.set_worker_model(None)

@ -901,7 +901,11 @@ class Message(object):
unpickler.find_global = self._find_global
try:
# Must occur off the broker thread.
obj = unpickler.load()
try:
obj = unpickler.load()
except:
LOG.error('raw pickle was: %r', self.data)
raise
self._unpickled = obj
except (TypeError, ValueError):
e = sys.exc_info()[1]

@ -2159,6 +2159,23 @@ class Router(mitogen.core.Router):
finally:
self._write_lock.release()
def disconnect(self, context):
"""
Disconnect a context and forget its stream, assuming the context is
directly connected.
"""
stream = self.stream_by_id(context)
if stream.remote_id != context.context_id:
return
l = mitogen.core.Latch()
mitogen.core.listen(stream, 'disconnect', l.put)
def disconnect():
LOG.debug('Starting disconnect of %r', stream)
stream.on_disconnect(self.broker)
self.broker.defer(disconnect)
l.get()
def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on

@ -92,6 +92,24 @@ def get_or_create_pool(size=None, router=None):
_pool_lock.release()
def call(service_name, method_name, call_context=None, **kwargs):
"""
Call a service registered with this pool, using the calling thread as a
host.
"""
if isinstance(service_name, mitogen.core.BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, mitogen.core.UnicodeType):
service_name = service_name.name() # Service.name()
if call_context:
return call_context.call_service(service_name, method_name, **kwargs)
else:
pool = get_or_create_pool()
invoker = pool.get_invoker(service_name, msg=None)
return getattr(invoker.service, method_name)(**kwargs)
def validate_arg_spec(spec, args):
for name in spec:
try:
@ -239,12 +257,13 @@ class Invoker(object):
if not policies:
raise mitogen.core.CallError('Method has no policies set.')
if not all(p.is_authorized(self.service, msg) for p in policies):
raise mitogen.core.CallError(
self.unauthorized_msg,
method_name,
self.service.name()
)
if msg is not None:
if not all(p.is_authorized(self.service, msg) for p in policies):
raise mitogen.core.CallError(
self.unauthorized_msg,
method_name,
self.service.name()
)
required = getattr(method, 'mitogen_service__arg_spec', {})
validate_arg_spec(required, kwargs)
@ -264,7 +283,7 @@ class Invoker(object):
except Exception:
if no_reply:
LOG.exception('While calling no-reply method %s.%s',
type(self.service).__name__,
self.service.name(),
func_name(method))
else:
raise
@ -690,7 +709,7 @@ class PushFileService(Service):
"""
for path in paths:
self.propagate_to(context, mitogen.core.to_text(path))
self.router.responder.forward_modules(context, modules)
#self.router.responder.forward_modules(context, modules) TODO
@expose(policy=AllowParents())
@arg_spec({

@ -51,7 +51,11 @@ else:
os.path.join(GIT_BASEDIR, 'tests/ansible/hosts')
)
args = ['ansible-playbook']
if 'ANSIBLE_ARGV' in os.environ:
args = eval(os.environ['ANSIBLE_ARGV'])
else:
args = ['ansible-playbook']
args += ['-e', json.dumps(extra)]
args += sys.argv[1:]
os.execvp(args[0], args)

Loading…
Cancel
Save