Merge remote-tracking branch 'origin/linear2'

* origin/linear2:
  [linear2] fix another test relying on Connection.parent
  [linear2] more merge fallout, fix Connection._mitogen_reset(mode=)
  [linear2] update mitogen_get_stack for new _build_stack() return value
  [linear2] fix mitogen_shutdown_all service context access
  docs: changelog concision / additions
  add 363 to changelog
  docs: update Changelog
  docs: note fd usage has halved
  docs: more stream-refactor work
  docs: update Changelog for stream-refactor.
  docs: Add lineinfile bug to changelog.
  [linear2] fix MuxProcess test fixture and some merge fallout
  service: avoid taking another lock in the usual case
  service: don't acquire lock when pool already initialized
  profiler: marginal improvements
  core: ensure 'exit' signal fires even on Broker crash.
  core: wake Waker outside of lock.
  core: wake Latch outside of lock.
  core: remove old blocking call guard, it's in the wrong place
  Make setting affinity optional.
  ansible: abstract worker process model.
  [stream-refactor] parent: fix crash on graceful shutdown
  parent: tidy up create_socketpair()
  core: more concise Side.repr.
pull/607/head
David Wilson 5 years ago
commit 83b33a8fb1

@ -38,6 +38,7 @@ import sys
import time
import jinja2.runtime
from ansible.module_utils import six
import ansible.constants as C
import ansible.errors
import ansible.plugins.connection
@ -459,15 +460,10 @@ class CallChain(mitogen.parent.CallChain):
class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Broker for this worker.
broker = None
#: mitogen.master.Router for this worker.
router = None
#: mitogen.parent.Context representing the parent Context, which is
#: presently always the connection multiplexer process.
parent = None
#: The :class:`ansible_mitogen.process.Binding` representing the connection
#: multiplexer this connection's target is assigned to. :data:`None` when
#: disconnected.
binding = None
#: mitogen.parent.Context for the target account on the target, possibly
#: reached via become.
@ -518,13 +514,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: matching vanilla Ansible behaviour.
loader_basedir = None
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'Mitogen connection types may only be instantiated '
'while the "mitogen" strategy is active.'
)
super(Connection, self).__init__(play_context, new_stdin)
def __del__(self):
"""
Ansible cannot be trusted to always call close() e.g. the synchronize
@ -585,6 +574,15 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
return self.init_child_result['home_dir']
def get_binding(self):
"""
Return the :class:`ansible_mitogen.process.Binding` representing the
process that hosts the physical connection and services (context
establishment, file transfer, ..) for our desired target.
"""
assert self.binding is not None
return self.binding
@property
def connected(self):
return self.context is not None
@ -672,18 +670,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return stack
def _connect_broker(self):
"""
Establish a reference to the Broker, Router and parent context used for
connections.
"""
if not self.broker:
self.broker = mitogen.master.Broker()
self.router, self.parent = mitogen.unix.connect(
path=ansible_mitogen.process.MuxProcess.unix_listener_path,
broker=self.broker,
)
def _build_stack(self):
"""
Construct a list of dictionaries representing the connection
@ -691,14 +677,14 @@ class Connection(ansible.plugins.connection.ConnectionBase):
additionally used by the integration tests "mitogen_get_stack" action
to fetch the would-be connection configuration.
"""
return self._stack_from_spec(
ansible_mitogen.transport_config.PlayContextSpec(
spec = ansible_mitogen.transport_config.PlayContextSpec(
connection=self,
play_context=self._play_context,
transport=self.transport,
inventory_name=self.inventory_hostname,
)
)
stack = self._stack_from_spec(spec)
return spec.inventory_name(), stack
def _connect_stack(self, stack):
"""
@ -711,7 +697,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
description of the returned dictionary.
"""
try:
dct = self.parent.call_service(
dct = mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService',
method_name='get',
stack=mitogen.utils.cast(list(stack)),
@ -758,8 +745,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if self.connected:
return
self._connect_broker()
stack = self._build_stack()
inventory_name, stack = self._build_stack()
worker_model = ansible_mitogen.process.get_worker_model()
self.binding = worker_model.get_binding(inventory_name)
self._connect_stack(stack)
def _mitogen_reset(self, mode):
@ -776,7 +764,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return
self.chain.reset()
self.parent.call_service(
mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService',
method_name=mode,
context=self.context
@ -787,27 +776,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.init_child_result = None
self.chain = None
def _shutdown_broker(self):
"""
Shutdown the broker thread during :meth:`close` or :meth:`reset`.
"""
if self.broker:
self.broker.shutdown()
self.broker.join()
self.broker = None
self.router = None
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch`
# FDs to be cached and erroneously shared by children on subsequent
# WorkerProcess forks. To handle that, call on_fork() to ensure any
# shared state is discarded.
# #490: only attempt to clean up when it's known that some
# resources exist to cleanup, otherwise later __del__ double-call
# to close() due to GC at random moment may obliterate an unrelated
# Connection's resources.
mitogen.fork.on_fork()
def close(self):
"""
Arrange for the mitogen.master.Router running in the worker to
@ -815,7 +783,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times.
"""
self._mitogen_reset(mode='put')
self._shutdown_broker()
if self.binding:
self.binding.close()
self.binding = None
def _reset_find_task_vars(self):
"""
@ -853,7 +823,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self._connect()
self._mitogen_reset(mode='reset')
self._shutdown_broker()
self.binding.close()
self.binding = None
# Compatibility with Ansible 2.4 wait_for_connection plug-in.
_reset = reset
@ -1024,7 +995,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
utimes=(st.st_atime, st.st_mtime))
self._connect()
self.parent.call_service(
mitogen.service.call(
call_context=self.binding.get_service_context(),
service_name='mitogen.service.FileService',
method_name='register',
path=mitogen.utils.cast(in_path)
@ -1036,7 +1008,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
# file alive, but that requires more work.
self.get_chain().call(
ansible_mitogen.target.transfer_file,
context=self.parent,
context=self.binding.get_child_service_context(),
in_path=in_path,
out_path=out_path
)

@ -148,6 +148,8 @@ class Planner(object):
# named by `runner_name`.
}
"""
binding = self._inv.connection.get_binding()
new = dict((mitogen.core.UnicodeType(k), kwargs[k])
for k in kwargs)
new.setdefault('good_temp_dir',
@ -155,7 +157,7 @@ class Planner(object):
new.setdefault('cwd', self._inv.connection.get_default_cwd())
new.setdefault('extra_env', self._inv.connection.get_default_env())
new.setdefault('emulate_tty', True)
new.setdefault('service_context', self._inv.connection.parent)
new.setdefault('service_context', binding.get_child_service_context())
return new
def __repr__(self):
@ -328,7 +330,9 @@ class NewStylePlanner(ScriptPlanner):
def get_module_map(self):
if self._module_map is None:
self._module_map = self._inv.connection.parent.call_service(
binding = self._inv.connection.get_binding()
self._module_map = mitogen.service.call(
call_context=binding.get_service_context(),
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
@ -405,9 +409,12 @@ def get_module_data(name):
def _propagate_deps(invocation, planner, context):
invocation.connection.parent.call_service(
binding = invocation.connection.get_binding()
mitogen.service.call(
call_context=binding.get_service_context(),
service_name='mitogen.service.PushFileService',
method_name='propagate_paths_and_modules',
context=context,
paths=planner.get_push_files(),
modules=planner.get_module_deps(),

@ -47,8 +47,9 @@ class ActionModule(ActionBase):
'skipped': True,
}
_, stack = self._connection._build_stack()
return {
'changed': True,
'result': self._connection._build_stack(),
'result': stack,
'_ansible_verbose_always': True,
}

@ -81,6 +81,6 @@ class Connection(ansible_mitogen.connection.Connection):
from WorkerProcess, we must emulate that.
"""
return dict_diff(
old=ansible_mitogen.process.MuxProcess.original_env,
old=ansible_mitogen.process.MuxProcess.cls_original_env,
new=os.environ,
)

@ -30,6 +30,7 @@ from __future__ import absolute_import
import atexit
import errno
import logging
import multiprocessing
import os
import signal
import socket
@ -41,9 +42,15 @@ try:
except ImportError:
faulthandler = None
try:
import setproctitle
except ImportError:
setproctitle = None
import mitogen
import mitogen.core
import mitogen.debug
import mitogen.fork
import mitogen.master
import mitogen.parent
import mitogen.service
@ -52,6 +59,7 @@ import mitogen.utils
import ansible
import ansible.constants as C
import ansible.errors
import ansible_mitogen.logging
import ansible_mitogen.services
@ -66,28 +74,58 @@ ANSIBLE_PKG_OVERRIDE = (
u"__author__ = %r\n"
)
worker_model_msg = (
'Mitogen connection types may only be instantiated when one of the '
'"mitogen_*" or "operon_*" strategies are active.'
)
#: The worker model as configured by the currently running strategy. This is
#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by
#: :class:`StrategyMixin`.
_worker_model = None
def clean_shutdown(sock):
#: A copy of the sole :class:`ClassicWorkerModel` that ever exists during a
#: classic run, as return by :func:`get_classic_worker_model`.
_classic_worker_model = None
def set_worker_model(model):
"""
Shut the write end of `sock`, causing `recv` in the worker process to wake
up with a 0-byte read and initiate mux process exit, then wait for a 0-byte
read from the read end, which will occur after the the child closes the
descriptor on exit.
To remove process model-wiring from
:class:`ansible_mitogen.connection.Connection`, it is necessary to track
some idea of the configured execution environment outside the connection
plug-in.
This is done using :mod:`atexit` since Ansible lacks any more sensible hook
to run code during exit, and unless some synchronization exists with
MuxProcess, debug logs may appear on the user's terminal *after* the prompt
has been printed.
That is what :func:`set_worker_model` and :func:`get_worker_model` are for.
"""
try:
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('clean_shutdown: ignoring duplicate call')
return
global _worker_model
assert model is None or _worker_model is None
_worker_model = model
sock.recv(1)
sock.close()
def get_worker_model():
"""
Return the :class:`WorkerModel` currently configured by the running
strategy.
"""
if _worker_model is None:
raise ansible.errors.AnsibleConnectionFailure(worker_model_msg)
return _worker_model
def get_classic_worker_model(**kwargs):
"""
Return the single :class:`ClassicWorkerModel` instance, constructing it if
necessary.
"""
global _classic_worker_model
assert _classic_worker_model is None or (not kwargs), \
"ClassicWorkerModel kwargs supplied but model already constructed"
if _classic_worker_model is None:
_classic_worker_model = ClassicWorkerModel(**kwargs)
return _classic_worker_model
def getenv_int(key, default=0):
@ -119,140 +157,19 @@ def save_pid(name):
fp.write(str(os.getpid()))
class MuxProcess(object):
"""
Implement a subprocess forked from the Ansible top-level, as a safe place
to contain the Mitogen IO multiplexer thread, keeping its use of the
logging package (and the logging package's heavy use of locks) far away
from the clutches of os.fork(), which is used continuously by the
multiprocessing package in the top-level process.
The problem with running the multiplexer in that process is that should the
multiplexer thread be in the process of emitting a log entry (and holding
its lock) at the point of fork, in the child, the first attempt to log any
log entry using the same handler will deadlock the child, as in the memory
image the child received, the lock will always be marked held.
See https://bugs.python.org/issue6721 for a thorough description of the
class of problems this worker is intended to avoid.
"""
#: In the top-level process, this references one end of a socketpair(),
#: which the MuxProcess blocks reading from in order to determine when
#: the master process dies. Once the read returns, the MuxProcess will
#: begin shutting itself down.
worker_sock = None
#: In the worker process, this references the other end of
#: :py:attr:`worker_sock`.
child_sock = None
#: In the top-level process, this is the PID of the single MuxProcess
#: that was spawned.
worker_pid = None
#: A copy of :data:`os.environ` at the time the multiplexer process was
#: started. It's used by mitogen_local.py to find changes made to the
#: top-level environment (e.g. vars plugins -- issue #297) that must be
#: applied to locally executed commands and modules.
original_env = None
#: In both processes, this is the temporary UNIX socket used for
#: forked WorkerProcesses to contact the MuxProcess
unix_listener_path = None
@classmethod
def _reset(cls):
def setup_pool(pool):
"""
Used to clean up in unit tests.
Configure a connection multiplexer's :class:`mitogen.service.Pool` with
services accessed by clients and WorkerProcesses.
"""
assert cls.worker_sock is not None
cls.worker_sock.close()
cls.worker_sock = None
os.waitpid(cls.worker_pid, 0)
pool.add(mitogen.service.FileService(router=pool.router))
pool.add(mitogen.service.PushFileService(router=pool.router))
pool.add(ansible_mitogen.services.ContextService(router=pool.router))
pool.add(ansible_mitogen.services.ModuleDepService(pool.router))
LOG.debug('Service pool configured: size=%d', pool.size)
@classmethod
def start(cls, _init_logging=True):
"""
Arrange for the subprocess to be started, if it is not already running.
The parent process picks a UNIX socket path the child will use prior to
fork, creates a socketpair used essentially as a semaphore, then blocks
waiting for the child to indicate the UNIX socket is ready for use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
if cls.worker_sock is not None:
return
if faulthandler is not None:
faulthandler.enable()
mitogen.utils.setup_gil()
cls.unix_listener_path = mitogen.unix.make_socket_path()
cls.worker_sock, cls.child_sock = socket.socketpair()
atexit.register(clean_shutdown, cls.worker_sock)
mitogen.core.set_cloexec(cls.worker_sock.fileno())
mitogen.core.set_cloexec(cls.child_sock.fileno())
cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None
if cls.profiling:
mitogen.core.enable_profiling()
if _init_logging:
ansible_mitogen.logging.setup()
cls.original_env = dict(os.environ)
cls.worker_pid = os.fork()
if cls.worker_pid:
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
ansible_mitogen.affinity.policy.assign_controller()
cls.child_sock.close()
cls.child_sock = None
mitogen.core.io_op(cls.worker_sock.recv, 1)
else:
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
cls.worker_sock.close()
cls.worker_sock = None
self = cls()
self.worker_main()
def worker_main(self):
"""
The main function of for the mux process: setup the Mitogen broker
thread and ansible_mitogen services, then sleep waiting for the socket
connected to the parent to be closed (indicating the parent has died).
"""
self._setup_master()
self._setup_services()
try:
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.child_sock.send, b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.child_sock.recv, 1)
finally:
self.broker.shutdown()
self.broker.join()
# Test frameworks living somewhere higher on the stack of the
# original parent process may try to catch sys.exit(), so do a C
# level exit instead.
os._exit(0)
def _enable_router_debug(self):
if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug()
def _enable_stack_dumps(self):
secs = getenv_int('MITOGEN_DUMP_THREAD_STACKS', default=0)
if secs:
mitogen.debug.dump_to_logger(secs=secs)
def _setup_simplejson(self, responder):
def _setup_simplejson(responder):
"""
We support serving simplejson for Python 2.4 targets on Ansible 2.3, at
least so the package's own CI Docker scripts can run without external
@ -287,14 +204,15 @@ class MuxProcess(object):
is_pkg=is_pkg,
)
def _setup_responder(self, responder):
def _setup_responder(responder):
"""
Configure :class:`mitogen.master.ModuleResponder` to only permit
certain packages, and to generate custom responses for certain modules.
"""
responder.whitelist_prefix('ansible')
responder.whitelist_prefix('ansible_mitogen')
self._setup_simplejson(responder)
_setup_simplejson(responder)
# Ansible 2.3 is compatible with Python 2.4 targets, however
# ansible/__init__.py is not. Instead, executor/module_common.py writes
@ -310,6 +228,387 @@ class MuxProcess(object):
is_pkg=True,
)
def common_setup(enable_affinity=True, _init_logging=True):
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
if enable_affinity:
ansible_mitogen.affinity.policy.assign_controller()
mitogen.utils.setup_gil()
if _init_logging:
ansible_mitogen.logging.setup()
if faulthandler is not None:
faulthandler.enable()
MuxProcess.profiling = getenv_int('MITOGEN_PROFILING') > 0
if MuxProcess.profiling:
mitogen.core.enable_profiling()
MuxProcess.cls_original_env = dict(os.environ)
def get_cpu_count(default=None):
"""
Get the multiplexer CPU count from the MITOGEN_CPU_COUNT environment
variable, returning `default` if one isn't set, or is out of range.
:param int default:
Default CPU, or :data:`None` to use all available CPUs.
"""
max_cpus = multiprocessing.cpu_count()
if default is None:
default = max_cpus
cpu_count = getenv_int('MITOGEN_CPU_COUNT', default=default)
if cpu_count < 1 or cpu_count > max_cpus:
cpu_count = default
return cpu_count
class Binding(object):
def get_child_service_context(self):
"""
Return the :class:`mitogen.core.Context` to which children should
direct ContextService requests, or :data:`None` for the local process.
"""
raise NotImplementedError()
def get_service_context(self):
"""
Return the :class:`mitogen.core.Context` to which this process should
direct ContextService requests, or :data:`None` for the local process.
"""
raise NotImplementedError()
def close(self):
"""
Finalize any associated resources.
"""
raise NotImplementedError()
class WorkerModel(object):
def on_strategy_start(self):
"""
Called prior to strategy start in the top-level process. Responsible
for preparing any worker/connection multiplexer state.
"""
raise NotImplementedError()
def on_strategy_complete(self):
"""
Called after strategy completion in the top-level process. Must place
Ansible back in a "compatible" state where any other strategy plug-in
may execute.
"""
raise NotImplementedError()
def get_binding(self, inventory_name):
raise NotImplementedError()
class ClassicBinding(Binding):
"""
Only one connection may be active at a time in a classic worker, so its
binding just provides forwarders back to :class:`ClassicWorkerModel`.
"""
def __init__(self, model):
self.model = model
def get_service_context(self):
"""
See Binding.get_service_context().
"""
return self.model.parent
def get_child_service_context(self):
"""
See Binding.get_child_service_context().
"""
return self.model.parent
def close(self):
"""
See Binding.close().
"""
self.model.on_binding_close()
class ClassicWorkerModel(WorkerModel):
#: mitogen.master.Router for this worker.
router = None
#: mitogen.master.Broker for this worker.
broker = None
#: Name of multiplexer process socket we are currently connected to.
listener_path = None
#: mitogen.parent.Context representing the parent Context, which is the
#: connection multiplexer process when running in classic mode, or the
#: top-level process when running a new-style mode.
parent = None
def __init__(self, _init_logging=True):
self._init_logging = _init_logging
self.initialized = False
def _listener_for_name(self, name):
"""
Given a connection stack, return the UNIX listener that should be used
to communicate with it. This is a simple hash of the inventory name.
"""
if len(self._muxes) == 1:
return self._muxes[0].path
idx = abs(hash(name)) % len(self._muxes)
LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path)
return self._muxes[idx].path
def _reconnect(self, path):
if self.router is not None:
# Router can just be overwritten, but the previous parent
# connection must explicitly be removed from the broker first.
self.router.disconnect(self.parent)
self.parent = None
self.router = None
self.router, self.parent = mitogen.unix.connect(
path=path,
broker=self.broker,
)
self.listener_path = path
def on_process_exit(self, sock):
"""
This is an :mod:`atexit` handler installed in the top-level process.
Shut the write end of `sock`, causing the receive side of the socket in
every worker process to wake up with a 0-byte reads, and causing their
main threads to wake up and initiate shutdown. After shutting the
socket down, wait for a 0-byte read from the read end, which will occur
after the last child closes the descriptor on exit.
This is done using :mod:`atexit` since Ansible lacks any better hook to
run code during exit, and unless some synchronization exists with
MuxProcess, debug logs may appear on the user's terminal *after* the
prompt has been printed.
"""
try:
sock.shutdown(socket.SHUT_WR)
except socket.error:
# Already closed. This is possible when tests are running.
LOG.debug('on_process_exit: ignoring duplicate call')
return
mitogen.core.io_op(sock.recv, 1)
sock.close()
def _initialize(self):
"""
Arrange for classic process model connection multiplexer child
processes to be started, if they are not already running.
The parent process picks a UNIX socket path the child will use prior to
fork, creates a socketpair used essentially as a semaphore, then blocks
waiting for the child to indicate the UNIX socket is ready for use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
"""
common_setup(_init_logging=self._init_logging)
MuxProcess.cls_parent_sock, \
MuxProcess.cls_child_sock = socket.socketpair()
mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno())
mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno())
self._muxes = [
MuxProcess(index)
for index in range(get_cpu_count(default=1))
]
for mux in self._muxes:
mux.start()
atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock)
MuxProcess.cls_child_sock.close()
MuxProcess.cls_child_sock = None
def _test_reset(self):
"""
Used to clean up in unit tests.
"""
# TODO: split this up a bit.
global _classic_worker_model
assert MuxProcess.cls_parent_sock is not None
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
self.listener_path = None
self.router = None
self.parent = None
for mux in self._muxes:
pid, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux PID %d %s', pid,
mitogen.parent.returncode_to_str(status))
_classic_worker_model = None
set_worker_model(None)
def on_strategy_start(self):
"""
See WorkerModel.on_strategy_start().
"""
if not self.initialized:
self._initialize()
self.initialized = True
def on_strategy_complete(self):
"""
See WorkerModel.on_strategy_complete().
"""
def get_binding(self, inventory_name):
"""
See WorkerModel.get_binding().
"""
if self.broker is None:
self.broker = mitogen.master.Broker()
path = self._listener_for_name(inventory_name)
if path != self.listener_path:
self._reconnect(path)
return ClassicBinding(self)
def on_binding_close(self):
if not self.broker:
return
self.broker.shutdown()
self.broker.join()
self.router = None
self.broker = None
self.listener_path = None
self.initialized = False
# #420: Ansible executes "meta" actions in the top-level process,
# meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs
# to be cached and erroneously shared by children on subsequent
# WorkerProcess forks. To handle that, call on_fork() to ensure any
# shared state is discarded.
# #490: only attempt to clean up when it's known that some resources
# exist to cleanup, otherwise later __del__ double-call to close() due
# to GC at random moment may obliterate an unrelated Connection's
# related resources.
mitogen.fork.on_fork()
class MuxProcess(object):
"""
Implement a subprocess forked from the Ansible top-level, as a safe place
to contain the Mitogen IO multiplexer thread, keeping its use of the
logging package (and the logging package's heavy use of locks) far away
from the clutches of os.fork(), which is used continuously by the
multiprocessing package in the top-level process.
The problem with running the multiplexer in that process is that should the
multiplexer thread be in the process of emitting a log entry (and holding
its lock) at the point of fork, in the child, the first attempt to log any
log entry using the same handler will deadlock the child, as in the memory
image the child received, the lock will always be marked held.
See https://bugs.python.org/issue6721 for a thorough description of the
class of problems this worker is intended to avoid.
"""
#: In the top-level process, this references one end of a socketpair(),
#: whose other end child MuxProcesses block reading from to determine when
#: the master process dies. When the top-level exits abnormally, or
#: normally but where :func:`on_process_exit` has been called, this socket
#: will be closed, causing all the children to wake.
cls_parent_sock = None
#: In the mux process, this is the other end of :attr:`cls_parent_sock`.
#: The main thread blocks on a read from it until :attr:`cls_parent_sock`
#: is closed.
cls_child_sock = None
#: A copy of :data:`os.environ` at the time the multiplexer process was
#: started. It's used by mitogen_local.py to find changes made to the
#: top-level environment (e.g. vars plugins -- issue #297) that must be
#: applied to locally executed commands and modules.
cls_original_env = None
def __init__(self, index):
self.index = index
#: Individual path of this process.
self.path = mitogen.unix.make_socket_path()
def start(self):
self.pid = os.fork()
if self.pid:
# Wait for child to boot before continuing.
mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1)
return
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux:' + str(self.index))
if setproctitle:
setproctitle.setproctitle('mitogen mux:%s (%s)' % (
self.index,
os.path.basename(self.path),
))
MuxProcess.cls_parent_sock.close()
MuxProcess.cls_parent_sock = None
try:
try:
self.worker_main()
except Exception:
LOG.exception('worker_main() crashed')
finally:
sys.exit()
def worker_main(self):
"""
The main function of the mux process: setup the Mitogen broker thread
and ansible_mitogen services, then sleep waiting for the socket
connected to the parent to be closed (indicating the parent has died).
"""
save_pid('mux')
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess()
self._setup_master()
self._setup_services()
try:
# Let the parent know our listening socket is ready.
mitogen.core.io_op(self.cls_child_sock.send, b('1'))
# Block until the socket is closed, which happens on parent exit.
mitogen.core.io_op(self.cls_child_sock.recv, 1)
finally:
self.broker.shutdown()
self.broker.join()
# Test frameworks living somewhere higher on the stack of the
# original parent process may try to catch sys.exit(), so do a C
# level exit instead.
os._exit(0)
def _enable_router_debug(self):
if 'MITOGEN_ROUTER_DEBUG' in os.environ:
self.router.enable_debug()
def _enable_stack_dumps(self):
secs = getenv_int('MITOGEN_DUMP_THREAD_STACKS', default=0)
if secs:
mitogen.debug.dump_to_logger(secs=secs)
def _setup_master(self):
"""
Construct a Router, Broker, and mitogen.unix listener
@ -319,12 +618,12 @@ class MuxProcess(object):
broker=self.broker,
max_message_size=4096 * 1048576,
)
self._setup_responder(self.router.responder)
_setup_responder(self.router.responder)
mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown)
mitogen.core.listen(self.broker, 'exit', self.on_broker_exit)
self.listener = mitogen.unix.Listener.build_stream(
router=self.router,
path=self.unix_listener_path,
path=self.path,
backlog=C.DEFAULT_FORKS,
)
self._enable_router_debug()
@ -337,15 +636,9 @@ class MuxProcess(object):
"""
self.pool = mitogen.service.Pool(
router=self.router,
services=[
mitogen.service.FileService(router=self.router),
mitogen.service.PushFileService(router=self.router),
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=getenv_int('MITOGEN_POOL_SIZE', default=32),
)
LOG.debug('Service pool configured: size=%d', self.pool.size)
setup_pool(self.pool)
def on_broker_shutdown(self):
"""
@ -364,7 +657,7 @@ class MuxProcess(object):
ourself. In future this should gracefully join the pool, but TERM is
fine for now.
"""
if not self.profiling:
if not os.environ.get('MITOGEN_PROFILING'):
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect(). When profiling, threads
# including the broker must shut down gracefully, otherwise pstats

@ -326,6 +326,7 @@ class ContextService(mitogen.service.Service):
)
def _send_module_forwards(self, context):
return
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None
@ -372,7 +373,7 @@ class ContextService(mitogen.service.Service):
try:
method = getattr(self.router, spec['method'])
except AttributeError:
raise Error('unsupported method: %(transport)s' % spec)
raise Error('unsupported method: %(method)s' % spec)
context = method(via=via, unidirectional=True, **spec['kwargs'])
if via and spec.get('enable_lru'):
@ -382,6 +383,7 @@ class ContextService(mitogen.service.Service):
mitogen.core.listen(context, 'disconnect',
lambda: self._on_context_disconnect(context))
#self._send_module_forwards(context) TODO
self._send_module_forwards(context)
init_child_result = context.call(
ansible_mitogen.target.init_child,
@ -443,7 +445,7 @@ class ContextService(mitogen.service.Service):
@mitogen.service.arg_spec({
'stack': list
})
def get(self, msg, stack):
def get(self, stack):
"""
Return a Context referring to an established connection with the given
configuration, establishing new connections as necessary.

@ -31,6 +31,11 @@ import os
import signal
import threading
try:
import setproctitle
except ImportError:
setproctitle = None
import mitogen.core
import ansible_mitogen.affinity
import ansible_mitogen.loaders
@ -145,11 +150,17 @@ def wrap_connection_loader__get(name, *args, **kwargs):
return connection_loader__get(name, *args, **kwargs)
def wrap_worker__run(*args, **kwargs):
def wrap_worker__run(self):
"""
While the strategy is active, rewrite connection_loader.get() calls for
some transports into requests for a compatible Mitogen transport.
"""
if setproctitle:
setproctitle.setproctitle('worker:%s task:%s' % (
self._host.name,
self._task.action,
))
# Ignore parent's attempts to murder us when we still need to write
# profiling output.
if mitogen.core._profile_hook.__name__ != '_profile_hook':
@ -158,10 +169,60 @@ def wrap_worker__run(*args, **kwargs):
ansible_mitogen.logging.set_process_name('task')
ansible_mitogen.affinity.policy.assign_worker()
return mitogen.core._profile_hook('WorkerProcess',
lambda: worker__run(*args, **kwargs)
lambda: worker__run(self)
)
class AnsibleWrappers(object):
"""
Manage add/removal of various Ansible runtime hooks.
"""
def _add_plugin_paths(self):
"""
Add the Mitogen plug-in directories to the ModuleLoader path, avoiding
the need for manual configuration.
"""
base_dir = os.path.join(os.path.dirname(__file__), 'plugins')
ansible_mitogen.loaders.connection_loader.add_directory(
os.path.join(base_dir, 'connection')
)
ansible_mitogen.loaders.action_loader.add_directory(
os.path.join(base_dir, 'action')
)
def _install_wrappers(self):
"""
Install our PluginLoader monkey patches and update global variables
with references to the real functions.
"""
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
worker__run = ansible.executor.process.worker.WorkerProcess.run
ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run
def _remove_wrappers(self):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible.executor.process.worker.WorkerProcess.run = worker__run
def install(self):
self._add_plugin_paths()
self._install_wrappers()
def remove(self):
self._remove_wrappers()
class StrategyMixin(object):
"""
This mix-in enhances any built-in strategy by arranging for various Mitogen
@ -223,43 +284,6 @@ class StrategyMixin(object):
remote process, all the heavy lifting of transferring the action module
and its dependencies are automatically handled by Mitogen.
"""
def _install_wrappers(self):
"""
Install our PluginLoader monkey patches and update global variables
with references to the real functions.
"""
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run
worker__run = ansible.executor.process.worker.WorkerProcess.run
ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run
def _remove_wrappers(self):
"""
Uninstall the PluginLoader monkey patches.
"""
ansible_mitogen.loaders.action_loader.get = action_loader__get
ansible_mitogen.loaders.connection_loader.get = connection_loader__get
ansible.executor.process.worker.WorkerProcess.run = worker__run
def _add_plugin_paths(self):
"""
Add the Mitogen plug-in directories to the ModuleLoader path, avoiding
the need for manual configuration.
"""
base_dir = os.path.join(os.path.dirname(__file__), 'plugins')
ansible_mitogen.loaders.connection_loader.add_directory(
os.path.join(base_dir, 'connection')
)
ansible_mitogen.loaders.action_loader.add_directory(
os.path.join(base_dir, 'action')
)
def _queue_task(self, host, task, task_vars, play_context):
"""
@ -290,20 +314,35 @@ class StrategyMixin(object):
play_context=play_context,
)
def _get_worker_model(self):
"""
In classic mode a single :class:`WorkerModel` exists, which manages
references and configuration of the associated connection multiplexer
process.
"""
return ansible_mitogen.process.get_classic_worker_model()
def run(self, iterator, play_context, result=0):
"""
Arrange for a mitogen.master.Router to be available for the duration of
the strategy's real run() method.
Wrap :meth:`run` to ensure requisite infrastructure and modifications
are configured for the duration of the call.
"""
_assert_supported_release()
ansible_mitogen.process.MuxProcess.start()
run = super(StrategyMixin, self).run
self._add_plugin_paths()
self._install_wrappers()
wrappers = AnsibleWrappers()
self._worker_model = self._get_worker_model()
ansible_mitogen.process.set_worker_model(self._worker_model)
try:
self._worker_model.on_strategy_start()
try:
wrappers.install()
try:
run = super(StrategyMixin, self).run
return mitogen.core._profile_hook('Strategy',
lambda: run(iterator, play_context)
)
finally:
self._remove_wrappers()
wrappers.remove()
finally:
self._worker_model.on_strategy_complete()
finally:
ansible_mitogen.process.set_worker_model(None)

@ -24,16 +24,45 @@ To avail of fixes in an unreleased version, please download a ZIP file
Enhancements
^^^^^^^^^^^^
* `#587 <https://github.com/dw/mitogen/issues/587>`_: partial support for
Ansible 2.8 is now available. This implementation does not yet support the
new `become plugins
<https://docs.ansible.com/ansible/latest/plugins/become.html>`_
functionality, which will be addressed in a future release.
* `#587 <https://github.com/dw/mitogen/issues/587>`_: Ansible 2.8 is partially
supported. `Become plugins
<https://docs.ansible.com/ansible/latest/plugins/become.html>`_ and
`interpreter discovery
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
are not yet handled.
* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection
multiplexer into per-CPU worker processes. This improves throughput for large
runs especially involving file transfer, and is a prerequisite to future
in-process SSH support. To match the behaviour of older releases, only one
multiplexer is started by default.
* `#419 <https://github.com/dw/mitogen/issues/419>`_,
`#470 <https://github.com/dw/mitogen/issues/470>`_, file descriptor usage
during large runs is halved, as it is no longer necessary to manage read and
write sides distinctly in order to work around a design limitation.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: almost all connection
setup happens on one thread, reducing GIL contention and context switching
early in a run.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: 2 network round-trips
were removed from early connection setup.
* `? <https://github.com/dw/mitogen/commit/7ae926b3>`_,
`? <https://github.com/dw/mitogen/commit/7ae926b3>`_,
`? <https://github.com/dw/mitogen/commit/7ae926b3>`_,
`? <https://github.com/dw/mitogen/commit/7ae926b3>`_: locking is avoided in
some hot paths, and locks that must be taken are held for less time.
Fixes
^^^^^
* `#363 <https://github.com/dw/mitogen/issues/363>`_: fix an obscure race
matching *Permission denied* errors from some versions of ``su`` running on
heavily loaded machines.
* `#578 <https://github.com/dw/mitogen/issues/578>`_: the extension could crash
while rendering an error message, due to an incorrect format string.
@ -53,6 +82,59 @@ Fixes
``mitogen_ssh_keepalive_count`` variables, and the default timeout for an SSH
server has been increased from `15*3` seconds to `30*10` seconds.
* `7ae926b3 <https://github.com/dw/mitogen/commit/7ae926b3>`_: the
``lineinfile`` module began leaking writable temporary file descriptors since
Ansible 2.7.0. When ``lineinfile`` was used to create or modify a script, and
that script was later executed, the execution could fail with "*text file
busy*" due to the leaked descriptor. Temporary descriptors are now tracked
and cleaned up on exit for all modules.
Core Library
~~~~~~~~~~~~
* Logs are more readable, and many :func:`repr` strings are more descriptive.
The old pseudo-function-call format is slowly migrating to human-readable
output where possible. For example, *"Stream(ssh:123).connect()"* might
be written *"connecting to ssh:123"*.
* :func:`bytearray` was removed from the list of supported serialization types.
It was never portable between Python versions, unused, and never made much
sense to support as a wire type.
* `#170 <https://github.com/dw/mitogen/issues/170>`_: to improve subprocess
management and asynchronous connect, a :class:`mitogen.parent.TimerList`
interface is available, accessible as :attr:`Broker.timers` in an
asynchronous context.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: the internal
:class:`mitogen.core.Stream` has been refactored into 7 new classes,
modularizing protocol behaviour, output buffering, line-oriented input
parsing, option handling and connection management. Connection setup is
internally asynchronous, laying almost all the groundwork needed for fully
asynchronous connect, proxied Ansible become plug-ins, and integrating
`libssh <https://www.libssh.org/>`_.
* `#169 <https://github.com/dw/mitogen/issues/169>`_,
`#419 <https://github.com/dw/mitogen/issues/419>`_: zombie child reaping has
vastly improved, by using timers to efficiently poll for a slow child to
finish exiting. Polling avoids relying on process-global configuration such
as a `SIGCHLD` handler, or :func:`signal.set_wakeup_fd` available in modern
Python.
* `#256 <https://github.com/dw/mitogen/issues/256>`_,
`#419 <https://github.com/dw/mitogen/issues/419>`_: most :func:`os.dup` was
eliminated, along with almost all manual file descriptor management.
Descriptors are trapped in :func:`os.fdopen` objects when they are created,
ensuring a leaked object will close itself, and ensuring every descriptor is
fused to a `closed` flag, preventing historical bugs where a double close
could destroy descriptors belonging to unrelated streams.
* `a5536c35 <https://github.com/dw/mitogen/commit/a5536c35>`_: avoid quadratic
buffer management when logging lines received from a child's redirected
standard IO.
Thanks!
~~~~~~~
@ -64,8 +146,10 @@ bug reports, testing, features and fixes in this release contributed by
`Orion Poplawski <https://github.com/opoplawski>`_,
`Szabó Dániel Ernő <https://github.com/r3ap3rpy>`_,
`Ulrich Schreiner <https://github.com/ulrichSchreiner>`_,
`Yuki Nishida <https://github.com/yuki-nishida-exa>`_, and
`@ghp-rr <https://github.com/ghp-rr>`_.
`Yuki Nishida <https://github.com/yuki-nishida-exa>`_,
`@ghp-rr <https://github.com/ghp-rr>`_,
`Pieter Voet <https://github.com/pietervoet/>`_, and
`@rizzly <https://github.com/rizzly>`_.
v0.2.7 (2019-05-19)
@ -102,14 +186,6 @@ Fixes
potential influx of 2.8-related bug reports.
Core Library
~~~~~~~~~~~~
* `#170 <https://github.com/dw/mitogen/issues/170>`_: to better support child
process management and a future asynchronous connect implementation, a
:class:`mitogen.parent.TimerList` API is available.
Thanks!
~~~~~~~

@ -901,7 +901,11 @@ class Message(object):
unpickler.find_global = self._find_global
try:
# Must occur off the broker thread.
try:
obj = unpickler.load()
except:
LOG.error('raw pickle was: %r', self.data)
raise
self._unpickled = obj
except (TypeError, ValueError):
e = sys.exc_info()[1]
@ -1785,7 +1789,10 @@ class Side(object):
set_nonblock(self.fd)
def __repr__(self):
return '<Side of %r fd %s>' % (self.stream, self.fd)
return '<Side of %s fd %s>' % (
self.stream.name or repr(self.stream),
self.fd
)
@classmethod
def _on_fork(cls):
@ -2039,9 +2046,6 @@ class Context(object):
:class:`Receiver` configured to receive any replies sent to the
message's `reply_to` handle.
"""
if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread')
receiver = Receiver(self.router, persist=persist, respondent=self)
msg.dst_id = self.context_id
msg.reply_to = receiver.handle
@ -2485,17 +2489,20 @@ class Latch(object):
raise LatchError()
self._queue.append(obj)
wsock = None
if self._waking < len(self._sleeping):
wsock, cookie = self._sleeping[self._waking]
self._waking += 1
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, wsock.fileno())
self._wake(wsock, cookie)
elif self.notify:
self.notify(self)
finally:
self._lock.release()
if wsock:
self._wake(wsock, cookie)
def _wake(self, wsock, cookie):
written, disconnected = io_op(os.write, wsock.fileno(), cookie)
assert written == len(cookie) and not disconnected
@ -2606,12 +2613,14 @@ class Waker(Protocol):
self.stream.transmit_side.fd)
self._lock.acquire()
try:
if not self._deferred:
self._wake()
should_wake = not self._deferred
self._deferred.append((func, args, kwargs))
finally:
self._lock.release()
if should_wake:
self._wake()
class IoLoggerProtocol(DelimitedProtocol):
"""
@ -3263,7 +3272,10 @@ class Broker(object):
self._broker_exit()
def _broker_main(self):
try:
_profile_hook('mitogen.broker', self._do_broker_main)
finally:
# 'finally' to ensure _on_broker_exit() can always SIGTERM.
fire(self, 'exit')
def shutdown(self):

@ -121,6 +121,19 @@ def handle_child_crash():
os._exit(1)
def _convert_exit_status(status):
"""
Convert a :func:`os.waitpid`-style exit status to a :mod:`subprocess` style
exit status.
"""
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Process(mitogen.parent.Process):
def poll(self):
try:
@ -134,12 +147,7 @@ class Process(mitogen.parent.Process):
if not pid:
return
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
return _convert_exit_status(status)
class Options(mitogen.parent.Options):

@ -260,13 +260,13 @@ def create_socketpair(size=None):
``stdout``. As they are sockets their buffers are tunable, allowing large
buffers to improve file transfer throughput and reduce IO loop iterations.
"""
if size is None:
size = mitogen.core.CHUNK_SIZE
parentfp, childfp = socket.socketpair()
parentfp.setsockopt(socket.SOL_SOCKET,
socket.SO_SNDBUF,
size or mitogen.core.CHUNK_SIZE)
childfp.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVBUF,
size or mitogen.core.CHUNK_SIZE)
for fp in parentfp, childfp:
fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
return parentfp, childfp
@ -638,10 +638,10 @@ def _upgrade_broker(broker):
# This function is deadly! The act of calling start_receive() generates log
# messages which must be silenced as the upgrade progresses, otherwise the
# poller state will change as it is copied, resulting in write fds that are
# lost. (Due to LogHandler->Router->Stream->Broker->Poller, where Stream
# only calls start_transmit() when transitioning from empty to non-empty
# buffer. If the start_transmit() is lost, writes from the child hang
# permanently).
# lost. (Due to LogHandler->Router->Stream->Protocol->Broker->Poller, where
# Stream only calls start_transmit() when transitioning from empty to
# non-empty buffer. If the start_transmit() is lost, writes from the child
# hang permanently).
root = logging.getLogger()
old_level = root.level
root.setLevel(logging.CRITICAL)
@ -810,7 +810,8 @@ class CallSpec(object):
class PollPoller(mitogen.core.Poller):
"""
Poller based on the POSIX poll(2) interface. Not available on some versions
of OS X, otherwise it is the preferred poller for small FD counts.
of OS X, otherwise it is the preferred poller for small FD counts, as there
is no setup/teardown/configuration system call overhead.
"""
SUPPORTED = hasattr(select, 'poll')
_repr = 'PollPoller()'
@ -1106,8 +1107,8 @@ class BootstrapProtocol(RegexProtocol):
"""
Respond to stdout of a child during bootstrap. Wait for EC0_MARKER to be
written by the first stage to indicate it can receive the bootstrap, then
await EC1_MARKER to indicate success, and
:class:`mitogen.core.MitogenProtocol` can be enabled.
await EC1_MARKER to indicate success, and :class:`MitogenProtocol` can be
enabled.
"""
#: Sentinel value emitted by the first stage to indicate it is ready to
#: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
@ -1161,6 +1162,26 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
LOG.info(u'%s: %s', self.stream.name, line.decode('utf-8', 'replace'))
class MitogenProtocol(mitogen.core.MitogenProtocol):
"""
Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child
during graceful shutdown.
"""
def on_shutdown(self, broker):
"""
Respond to the broker's request for the stream to shut down by sending
SHUTDOWN to the child.
"""
LOG.debug('%r: requesting child shutdown', self)
self._send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
class Options(object):
name = None
@ -1407,7 +1428,7 @@ class Connection(object):
if not self.exception:
self._router.register(self.context, self.stdio_stream)
self.stdio_stream.set_protocol(
mitogen.core.MitogenProtocol(
MitogenProtocol(
router=self._router,
remote_id=self.context.context_id,
)
@ -1428,19 +1449,6 @@ class Connection(object):
stream.on_disconnect(self._router.broker)
self._complete_connection()
def on_stream_shutdown(self):
"""
Request the slave gracefully shut itself down.
"""
LOG.debug('%r: requesting child shutdown', self)
self.stdio_stream.protocol._send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.stdio_stream.protocol.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
eof_error_msg = 'EOF on stream; last 100 lines received:\n'
def on_stdio_disconnect(self):
@ -1511,7 +1519,6 @@ class Connection(object):
stream.name = self.options.name or self._get_name()
stream.accept(self.proc.stdout, self.proc.stdin)
mitogen.core.listen(stream, 'shutdown', self.on_stream_shutdown)
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
self._router.broker.start_receive(stream)
return stream
@ -2152,6 +2159,23 @@ class Router(mitogen.core.Router):
finally:
self._write_lock.release()
def disconnect(self, context):
"""
Disconnect a context and forget its stream, assuming the context is
directly connected.
"""
stream = self.stream_by_id(context)
if stream.remote_id != context.context_id:
return
l = mitogen.core.Latch()
mitogen.core.listen(stream, 'disconnect', l.put)
def disconnect():
LOG.debug('Starting disconnect of %r', stream)
stream.on_disconnect(self.broker)
self.broker.defer(disconnect)
l.get()
def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on

@ -72,13 +72,13 @@ def try_merge(stats, path):
stats.add(path)
return True
except Exception as e:
print('Failed. Race? Will retry. %s' % (e,))
print('%s failed. Will retry. %s' % (path, e))
return False
def merge_stats(outpath, inpaths):
first, rest = inpaths[0], inpaths[1:]
for x in range(5):
for x in range(1):
try:
stats = pstats.Stats(first)
except EOFError:

@ -77,20 +77,43 @@ else:
def get_or_create_pool(size=None, router=None):
global _pool
global _pool_pid
my_pid = os.getpid()
if _pool is None or my_pid != _pool_pid:
# Avoid acquiring heavily contended lock if possible.
_pool_lock.acquire()
try:
if _pool_pid != os.getpid():
if _pool_pid != my_pid:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE,
overwrite=True)
# In case of Broker shutdown crash, Pool can cause 'zombie'
# processes.
mitogen.core.listen(router.broker, 'shutdown',
lambda: _pool.stop(join=False))
lambda: _pool.stop(join=True))
_pool_pid = os.getpid()
return _pool
finally:
_pool_lock.release()
return _pool
def call(service_name, method_name, call_context=None, **kwargs):
"""
Call a service registered with this pool, using the calling thread as a
host.
"""
if isinstance(service_name, mitogen.core.BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, mitogen.core.UnicodeType):
service_name = service_name.name() # Service.name()
if call_context:
return call_context.call_service(service_name, method_name, **kwargs)
else:
pool = get_or_create_pool()
invoker = pool.get_invoker(service_name, msg=None)
return getattr(invoker.service, method_name)(**kwargs)
def validate_arg_spec(spec, args):
for name in spec:
@ -239,6 +262,7 @@ class Invoker(object):
if not policies:
raise mitogen.core.CallError('Method has no policies set.')
if msg is not None:
if not all(p.is_authorized(self.service, msg) for p in policies):
raise mitogen.core.CallError(
self.unauthorized_msg,
@ -264,7 +288,7 @@ class Invoker(object):
except Exception:
if no_reply:
LOG.exception('While calling no-reply method %s.%s',
type(self.service).__name__,
self.service.name(),
func_name(method))
else:
raise
@ -523,6 +547,9 @@ class Pool(object):
invoker.service.on_shutdown()
def get_invoker(self, name, msg):
invoker = self._invoker_by_name.get(name)
if invoker is None:
# Avoid acquiring lock if possible.
self._lock.acquire()
try:
invoker = self._invoker_by_name.get(name)
@ -690,7 +717,7 @@ class PushFileService(Service):
"""
for path in paths:
self.propagate_to(context, mitogen.core.to_text(path))
self.router.responder.forward_modules(context, modules)
#self.router.responder.forward_modules(context, modules) TODO
@expose(policy=AllowParents())
@arg_spec({

@ -24,7 +24,7 @@
- mitogen_action_script:
script: |
self._connection._connect()
result['dump'] = self._connection.parent.call_service(
result['dump'] = self._connection.get_binding().get_service_context().call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='dump'
)
@ -39,7 +39,7 @@
- mitogen_action_script:
script: |
self._connection._connect()
result['dump'] = self._connection.parent.call_service(
result['dump'] = self._connection.get_binding().get_service_context().call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='dump'
)

@ -23,9 +23,10 @@ class ActionModule(ActionBase):
}
self._connection._connect()
binding = self._connection.get_binding()
return {
'changed': True,
'result': self._connection.parent.call_service(
'result': binding.get_service_context().call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='shutdown_all',
)

@ -51,7 +51,11 @@ else:
os.path.join(GIT_BASEDIR, 'tests/ansible/hosts')
)
args = ['ansible-playbook']
if 'ANSIBLE_ARGV' in os.environ:
args = eval(os.environ['ANSIBLE_ARGV'])
else:
args = ['ansible-playbook']
args += ['-e', json.dumps(extra)]
args += sys.argv[1:]
os.execvp(args[0], args)

@ -26,13 +26,17 @@ class MuxProcessMixin(object):
@classmethod
def setUpClass(cls):
#mitogen.utils.log_to_file()
ansible_mitogen.process.MuxProcess.start(_init_logging=False)
cls.model = ansible_mitogen.process.get_classic_worker_model(
_init_logging=False
)
ansible_mitogen.process.set_worker_model(cls.model)
cls.model.on_strategy_start()
super(MuxProcessMixin, cls).setUpClass()
@classmethod
def tearDownClass(cls):
cls.model._test_reset()
super(MuxProcessMixin, cls).tearDownClass()
ansible_mitogen.process.MuxProcess._reset()
class ConnectionMixin(MuxProcessMixin):

Loading…
Cancel
Save