Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  docs: merge signals.rst into internals.rst
  os_fork: do not attempt to cork the active thread.
  parent: fix get_log_level() for split out loggers.
  issue #547: fix service_test failures.
  issue #547: update Changelog.
  issue #547: core/service: race/deadlock-free service pool init
  docs: update Changelog.
  select: make Select.add() handle multiple buffered items.
  core/select: add {Select,Latch,Receiver}.size(), deprecate empty()
  parent: docstring fixes
  core: remove dead Router.on_shutdown() and Router "shutdown" signal
  testlib: use lsof +E for much clearer leaked FD output
  [stream-refactor] stop leaking FD 100 for the life of the child
  core: split preserve_tty_fp() out into a function
  parent: zombie reaping v3
  issue #410: fix test failure due to obsolete parentfp/childfp
  issue #170: replace Timer.cancelled with Timer.active
  core: more descriptive graceful shutdown timeout error
  docs: update changelog
  core: fix Python2.4 crash due to missing Logger.getChild().
  issue #410: automatically work around SELinux braindamage.
  core: cache stream reference in DelimitedProtocol
  parent: docstring formatting
  docs: remove fakessh from home page, it's been broken forever
  docs: add changelog thanks
  Disable Azure pipelines build for docs-master too.
  docs: udpate Changelog.
  docs: tweak Changelog wording
  [linear2] merge fallout: re-enable _send_module_forwards().
  docs: another round of docstring cleanups.
  master: allow filtering forwarded logs using logging package functions.
  docs: many more internals.rst tidyups
  tests: fix error in affinity_test
  service: centralize fetching thread name, and tidy up logs
  [stream-refactor] get caught up on internals.rst updates
  Stop using mitogen root logger in more modules, remove unused loggers
  tests: stop dumping Docker help output in the log.
  parent: move subprocess creation to mux thread too
  Split out and make readable more log messages across both packages
  ansible: log affinity assignments
  ci: log failed command line, and try enabling stdout line buffering
  ansible: improve docstring
  [linear2] simplify _listener_for_name()
  ansible: stop relying on SIGTERM to shut down service pool
  tests: move tty_create_child tests together
  ansible: cleanup various docstrings
  parent: define Connection behaviour during Broker.shutdown()
  issue #549: ansible: reduce risk by capping RLIM_INFINITY
pull/612/head
David Wilson 5 years ago
commit 8eeff66bd7

@ -3,6 +3,11 @@
# Add steps that analyze code, save the dist with the build record, publish to a PyPI-compatible index, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/python
trigger:
branches:
exclude:
- docs-master
jobs:
- job: Mac

@ -57,8 +57,10 @@ def have_docker():
# -----------------
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
# Force line buffering on stdout.
sys.stdout = os.fdopen(1, 'w', 1)
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
if 'TRAVIS_HOME' in os.environ:
proc = subprocess.Popen(
args=['stdbuf', '-oL', 'cat'],
@ -86,8 +88,13 @@ def _argv(s, *args):
def run(s, *args, **kwargs):
argv = ['/usr/bin/time', '--'] + _argv(s, *args)
print('Running: %s' % (argv,))
ret = subprocess.check_call(argv, **kwargs)
print('Finished running: %s' % (argv,))
try:
ret = subprocess.check_call(argv, **kwargs)
print('Finished running: %s' % (argv,))
except Exception:
print('Exception occurred while running: %s' % (argv,))
raise
return ret
@ -217,6 +224,7 @@ def start_containers(containers):
"docker rm -f %(name)s || true" % container,
"docker run "
"--rm "
"--cpuset-cpus 0,1 "
"--detach "
"--privileged "
"--cap-add=SYS_PTRACE "

@ -73,7 +73,9 @@ necessarily involves preventing the scheduler from making load balancing
decisions.
"""
from __future__ import absolute_import
import ctypes
import logging
import mmap
import multiprocessing
import os
@ -83,6 +85,9 @@ import mitogen.core
import mitogen.parent
LOG = logging.getLogger(__name__)
try:
_libc = ctypes.CDLL(None, use_errno=True)
_strerror = _libc.strerror
@ -207,11 +212,13 @@ class FixedPolicy(Policy):
self._reserve_mask = 3
self._reserve_shift = 2
def _set_affinity(self, mask):
def _set_affinity(self, descr, mask):
if descr:
LOG.debug('CPU mask for %s: %#08x', descr, mask)
mitogen.parent._preexec_hook = self._clear
self._set_cpu_mask(mask)
def _balance(self):
def _balance(self, descr):
self.state.lock.acquire()
try:
n = self.state.counter
@ -219,28 +226,28 @@ class FixedPolicy(Policy):
finally:
self.state.lock.release()
self._set_cpu(self._reserve_shift + (
self._set_cpu(descr, self._reserve_shift + (
(n % (self.cpu_count - self._reserve_shift))
))
def _set_cpu(self, cpu):
self._set_affinity(1 << (cpu % self.cpu_count))
def _set_cpu(self, descr, cpu):
self._set_affinity(descr, 1 << (cpu % self.cpu_count))
def _clear(self):
all_cpus = (1 << self.cpu_count) - 1
self._set_affinity(all_cpus & ~self._reserve_mask)
self._set_affinity(None, all_cpus & ~self._reserve_mask)
def assign_controller(self):
if self._reserve_controller:
self._set_cpu(1)
self._set_cpu('Ansible top-level process', 1)
else:
self._balance()
self._balance('Ansible top-level process')
def assign_muxprocess(self, index):
self._set_cpu(index)
self._set_cpu('MuxProcess %d' % (index,), index)
def assign_worker(self):
self._balance()
self._balance('WorkerProcess')
def assign_subprocess(self):
self._clear()

@ -246,7 +246,14 @@ def increase_open_file_limit():
limit is much higher.
"""
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
LOG.debug('inherited open file limits: soft=%d hard=%d', soft, hard)
if hard == resource.RLIM_INFINITY:
hard_s = '(infinity)'
# cap in case of O(RLIMIT_NOFILE) algorithm in some subprocess.
hard = 524288
else:
hard_s = str(hard)
LOG.debug('inherited open file limits: soft=%d hard=%s', soft, hard_s)
if soft >= hard:
LOG.debug('max open files already set to hard limit: %d', hard)
return
@ -268,13 +275,13 @@ def common_setup(enable_affinity=True, _init_logging=True):
save_pid('controller')
ansible_mitogen.logging.set_process_name('top')
if _init_logging:
ansible_mitogen.logging.setup()
if enable_affinity:
ansible_mitogen.affinity.policy.assign_controller()
mitogen.utils.setup_gil()
if _init_logging:
ansible_mitogen.logging.setup()
if faulthandler is not None:
faulthandler.enable()
@ -352,6 +359,11 @@ class Binding(object):
class WorkerModel(object):
"""
Interface used by StrategyMixin to manage various Mitogen services, by
default running in one or more connection multiplexer subprocesses spawned
off the top-level Ansible process.
"""
def on_strategy_start(self):
"""
Called prior to strategy start in the top-level process. Responsible
@ -368,6 +380,11 @@ class WorkerModel(object):
raise NotImplementedError()
def get_binding(self, inventory_name):
"""
Return a :class:`Binding` to access Mitogen services for
`inventory_name`. Usually called from worker processes, but may also be
called from top-level process to handle "meta: reset_connection".
"""
raise NotImplementedError()
@ -427,13 +444,10 @@ class ClassicWorkerModel(WorkerModel):
def __init__(self, _init_logging=True):
"""
Arrange for classic model multiplexers to be started, if they are not
already running.
The parent process picks a UNIX socket path each child will use prior
to fork, creates a socketpair used essentially as a semaphore, then
blocks waiting for the child to indicate the UNIX socket is ready for
use.
Arrange for classic model multiplexers to be started. The parent choses
UNIX socket paths each child will use prior to fork, creates a
socketpair used essentially as a semaphore, then blocks waiting for the
child to indicate the UNIX socket is ready for use.
:param bool _init_logging:
For testing, if :data:`False`, don't initialize logging.
@ -466,12 +480,10 @@ class ClassicWorkerModel(WorkerModel):
Given an inventory hostname, return the UNIX listener that should
communicate with it. This is a simple hash of the inventory name.
"""
if len(self._muxes) == 1:
return self._muxes[0].path
idx = abs(hash(name)) % len(self._muxes)
LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path)
return self._muxes[idx].path
mux = self._muxes[abs(hash(name)) % len(self._muxes)]
LOG.debug('will use multiplexer %d (%s) to connect to "%s"',
mux.index, mux.path, name)
return mux.path
def _reconnect(self, path):
if self.router is not None:
@ -498,9 +510,9 @@ class ClassicWorkerModel(WorkerModel):
This is an :mod:`atexit` handler installed in the top-level process.
Shut the write end of `sock`, causing the receive side of the socket in
every worker process to return 0-byte reads, and causing their main
threads to wake and initiate shutdown. After shutting the socket down,
wait on each child to finish exiting.
every :class:`MuxProcess` to return 0-byte reads, and causing their
main threads to wake and initiate shutdown. After shutting the socket
down, wait on each child to finish exiting.
This is done using :mod:`atexit` since Ansible lacks any better hook to
run code during exit, and unless some synchronization exists with
@ -523,30 +535,19 @@ class ClassicWorkerModel(WorkerModel):
for mux in self._muxes:
_, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
LOG.debug('multiplexer %d PID %d %s', mux.index, mux.pid,
mitogen.parent.returncode_to_str(status))
def _test_reset(self):
"""
Used to clean up in unit tests.
"""
# TODO: split this up a bit.
global _classic_worker_model
assert self.parent_sock is not None
self.parent_sock.close()
self.parent_sock = None
self.listener_path = None
self.router = None
self.parent = None
for mux in self._muxes:
pid, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux PID %d %s', pid,
mitogen.parent.returncode_to_str(status))
self.on_binding_close()
self._on_process_exit()
set_worker_model(None)
global _classic_worker_model
_classic_worker_model = None
set_worker_model(None)
def on_strategy_start(self):
"""
@ -579,6 +580,7 @@ class ClassicWorkerModel(WorkerModel):
self.broker.join()
self.router = None
self.broker = None
self.parent = None
self.listener_path = None
# #420: Ansible executes "meta" actions in the top-level process,
@ -694,8 +696,8 @@ class MuxProcess(object):
max_message_size=4096 * 1048576,
)
_setup_responder(self.router.responder)
mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown)
mitogen.core.listen(self.broker, 'exit', self.on_broker_exit)
mitogen.core.listen(self.broker, 'shutdown', self._on_broker_shutdown)
mitogen.core.listen(self.broker, 'exit', self._on_broker_exit)
self.listener = mitogen.unix.Listener.build_stream(
router=self.router,
path=self.path,
@ -715,26 +717,20 @@ class MuxProcess(object):
)
setup_pool(self.pool)
def on_broker_shutdown(self):
def _on_broker_shutdown(self):
"""
Respond to broker shutdown by beginning service pool shutdown. Do not
join on the pool yet, since that would block the broker thread which
then cannot clean up pending handlers, which is required for the
threads to exit gracefully.
Respond to broker shutdown by shutting down the pool. Do not join on it
yet, since that would block the broker thread which then cannot clean
up pending handlers and connections, which is required for the threads
to exit gracefully.
"""
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect().
self.pool.stop(join=self.profiling)
self.pool.stop(join=False)
def on_broker_exit(self):
def _on_broker_exit(self):
"""
Respond to the broker thread about to exit by sending SIGTERM to
ourself. In future this should gracefully join the pool, but TERM is
fine for now.
Respond to the broker thread about to exit by finally joining on the
pool. This is safe since pools only block in connection attempts, and
connection attempts fail with CancelledError when broker shutdown
begins.
"""
if not os.environ.get('MITOGEN_PROFILING'):
# In normal operation we presently kill the process because there is
# not yet any way to cancel connect(). When profiling, threads
# including the broker must shut down gracefully, otherwise pstats
# won't be written.
os.kill(os.getpid(), signal.SIGTERM)
self.pool.join()

@ -180,7 +180,7 @@ class ContextService(mitogen.service.Service):
Return a reference, making it eligable for recycling once its reference
count reaches zero.
"""
LOG.debug('%r.put(%r)', self, context)
LOG.debug('decrementing reference count for %r', context)
self._lock.acquire()
try:
if self._refs_by_context.get(context, 0) == 0:
@ -326,7 +326,6 @@ class ContextService(mitogen.service.Service):
)
def _send_module_forwards(self, context):
return
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None
@ -383,7 +382,6 @@ class ContextService(mitogen.service.Service):
mitogen.core.listen(context, 'disconnect',
lambda: self._on_context_disconnect(context))
#self._send_module_forwards(context) TODO
self._send_module_forwards(context)
init_child_result = context.call(
ansible_mitogen.target.init_child,

@ -124,7 +124,7 @@ def wrap_action_loader__get(name, *args, **kwargs):
the use of shell fragments wherever possible.
This is used instead of static subclassing as it generalizes to third party
action modules outside the Ansible tree.
action plugins outside the Ansible tree.
"""
get_kwargs = {'class_only': True}
if ansible.__version__ >= '2.8':
@ -141,8 +141,8 @@ def wrap_action_loader__get(name, *args, **kwargs):
def wrap_connection_loader__get(name, *args, **kwargs):
"""
While the strategy is active, rewrite connection_loader.get() calls for
some transports into requests for a compatible Mitogen transport.
While a Mitogen strategy is active, rewrite connection_loader.get() calls
for some transports into requests for a compatible Mitogen transport.
"""
if name in ('buildah', 'docker', 'kubectl', 'jail', 'local',
'lxc', 'lxd', 'machinectl', 'setns', 'ssh'):
@ -152,8 +152,10 @@ def wrap_connection_loader__get(name, *args, **kwargs):
def wrap_worker__run(self):
"""
While the strategy is active, rewrite connection_loader.get() calls for
some transports into requests for a compatible Mitogen transport.
While a Mitogen strategy is active, trap WorkerProcess.run() calls and use
the opportunity to set the worker's name in the process list and log
output, activate profiling if requested, and bind the worker to a specific
CPU.
"""
if setproctitle:
setproctitle.setproctitle('worker:%s task:%s' % (
@ -225,10 +227,14 @@ class AnsibleWrappers(object):
class StrategyMixin(object):
"""
This mix-in enhances any built-in strategy by arranging for various Mitogen
services to be initialized in the Ansible top-level process, and for worker
processes to grow support for using those top-level services to communicate
with and execute modules on remote hosts.
This mix-in enhances any built-in strategy by arranging for an appropriate
WorkerModel instance to be constructed as necessary, or for the existing
one to be reused.
The WorkerModel in turn arranges for a connection multiplexer to be started
somewhere (by default in an external process), and for WorkerProcesses to
grow support for using those top-level services to communicate with remote
hosts.
Mitogen:
@ -246,18 +252,19 @@ class StrategyMixin(object):
services, review the Standard Handles section of the How It Works guide
in the documentation.
A ContextService is installed as a message handler in the master
process and run on a private thread. It is responsible for accepting
requests to establish new SSH connections from worker processes, and
ensuring precisely one connection exists and is reused for subsequent
playbook steps. The service presently runs in a single thread, so to
begin with, new SSH connections are serialized.
A ContextService is installed as a message handler in the connection
mutliplexer subprocess and run on a private thread. It is responsible
for accepting requests to establish new SSH connections from worker
processes, and ensuring precisely one connection exists and is reused
for subsequent playbook steps. The service presently runs in a single
thread, so to begin with, new SSH connections are serialized.
Finally a mitogen.unix listener is created through which WorkerProcess
can establish a connection back into the master process, in order to
avail of ContextService. A UNIX listener socket is necessary as there
is no more sane mechanism to arrange for IPC between the Router in the
master process, and the corresponding Router in the worker process.
can establish a connection back into the connection multiplexer, in
order to avail of ContextService. A UNIX listener socket is necessary
as there is no more sane mechanism to arrange for IPC between the
Router in the connection multiplexer, and the corresponding Router in
the worker process.
Ansible:
@ -265,10 +272,10 @@ class StrategyMixin(object):
connection and action plug-ins.
For connection plug-ins, if the desired method is "local" or "ssh", it
is redirected to the "mitogen" connection plug-in. That plug-in
implements communication via a UNIX socket connection to the top-level
Ansible process, and uses ContextService running in the top-level
process to actually establish and manage the connection.
is redirected to one of the "mitogen_*" connection plug-ins. That
plug-in implements communication via a UNIX socket connection to the
connection multiplexer process, and uses ContextService running there
to establish a persistent connection to the target.
For action plug-ins, the original class is looked up as usual, but a
new subclass is created dynamically in order to mix-in

@ -96,8 +96,12 @@ Router Class
:members:
.. currentmodule:: mitogen.master
.. currentmodule:: mitogen.parent
.. autoclass:: Router
:members:
.. currentmodule:: mitogen.master
.. autoclass:: Router (broker=None)
:members:
@ -553,11 +557,11 @@ Context Class
.. currentmodule:: mitogen.parent
.. autoclass:: CallChain
.. autoclass:: Context
:members:
.. autoclass:: Context
.. currentmodule:: mitogen.parent
.. autoclass:: CallChain
:members:
@ -662,3 +666,7 @@ Exceptions
.. autoclass:: LatchError
.. autoclass:: StreamError
.. autoclass:: TimeoutError
.. currentmodule:: mitogen.parent
.. autoclass:: EofError
.. autoclass:: CancelledError

@ -24,7 +24,8 @@ To avail of fixes in an unreleased version, please download a ZIP file
Enhancements
^^^^^^^^^^^^
* `#587 <https://github.com/dw/mitogen/issues/587>`_: Ansible 2.8 is partially
* `#556 <https://github.com/dw/mitogen/issues/556>`_,
`#587 <https://github.com/dw/mitogen/issues/587>`_: Ansible 2.8 is partially
supported. `Become plugins
<https://docs.ansible.com/ansible/latest/plugins/become.html>`_ and
`interpreter discovery
@ -46,8 +47,10 @@ Enhancements
setup happens on one thread, reducing GIL contention and context switching
early in a run.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: 2 network round-trips
were removed from early connection setup.
* `#419 <https://github.com/dw/mitogen/issues/419>`_: Connection setup is
pipelined, eliminating several network round-trips. Most infrastructure is in
place to support future removal of the final round-trip between a target
fully booting and receiving its first function call.
* `d6faff06 <https://github.com/dw/mitogen/commit/d6faff06>`_,
`807cbef9 <https://github.com/dw/mitogen/commit/807cbef9>`_,
@ -64,6 +67,20 @@ Mitogen for Ansible
matching *Permission denied* errors from some versions of ``su`` running on
heavily loaded machines.
* `#410 <https://github.com/dw/mitogen/issues/410>`_: Use of ``AF_UNIX``
sockets automatically replaced with plain UNIX pipes when SELinux is
detected, to work around a broken heuristic in popular SELinux policies that
prevents inheriting ``AF_UNIX`` sockets across privilege domains.
* `#467 <https://github.com/dw/mitogen/issues/467>`_: an incompatibility
running Mitogen under Molecule was resolved.
* `#547 <https://github.com/dw/mitogen/issues/547>`_: fix a serious deadlock
possible during initialization of any task executed by forking, such as
``async`` tasks, tasks using custom :mod:`ansible.module_utils`,
``mitogen_task_isolation: fork`` modules, and those present on an internal
blacklist of misbehaving modules.
* `#549 <https://github.com/dw/mitogen/issues/549>`_: the open file descriptor
limit for the Ansible process is increased to the available hard limit. It is
common for distributions to ship with a much higher hard limit than their
@ -154,6 +171,18 @@ Core Library
buffer management when logging lines received from a child's redirected
standard IO.
* `49a6446a <https://github.com/dw/mitogen/commit/49a6446a>`_: the
:meth:`empty` method of :class:`mitogen.core.Latch`,
:class:`mitogen.core.Receiver` and :class:`mitogen.select.Select` has been
replaced by a more general :meth:`size` method. :meth:`empty` will be removed
in Mitogen 0.3
* `ecc570cb <https://github.com/dw/mitogen/commit/ecc570cb>`_: previously
:meth:`mitogen.select.Select.add` would enqueue a single wake event when
adding an existing receiver, latch or subselect that contained multiple
buffered items, causing future :meth:`get` calls to block or fail even though
data existed that could be returned.
Thanks!
~~~~~~~
@ -162,7 +191,10 @@ Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by
`Andreas Hubert <https://github.com/peshay>`_.
`Anton Markelov <https://github.com/strangeman>`_,
`Dan <https://github.com/dsgnr>`_,
`Dave Cottlehuber <https://github.com/dch>`_,
`El Mehdi CHAOUKI <https://github.com/elmchaouki>`_,
`James Hogarth <https://github.com/hogarthj>`_,
`Nigel Metheringham <https://github.com/nigelm>`_,
`Orion Poplawski <https://github.com/opoplawski>`_,
`Pieter Voet <https://github.com/pietervoet/>`_,
@ -170,6 +202,7 @@ bug reports, testing, features and fixes in this release contributed by
`Szabó Dániel Ernő <https://github.com/r3ap3rpy>`_,
`Ulrich Schreiner <https://github.com/ulrichSchreiner>`_,
`Yuki Nishida <https://github.com/yuki-nishida-exa>`_,
`@DavidVentura <https://github.com/DavidVentura>`_,
`@ghp-rr <https://github.com/ghp-rr>`_,
`@rizzly <https://github.com/rizzly>`_, and
`@tho86 <https://github.com/tho86>`_.

@ -346,11 +346,15 @@ Masters listen on the following handles:
.. currentmodule:: mitogen.core
.. data:: ALLOCATE_ID
Replies to any message sent to it with a newly allocated range of context
IDs, to allow children to safely start their own contexts. Presently IDs
are allocated in batches of 1000 from a 32 bit range, allowing up to 4.2
million parent contexts to be created and destroyed before the associated
Router must be recreated.
Replies to any message sent to it with a newly allocated range of context
IDs, to allow children to safely start their own contexts. Presently IDs are
allocated in batches of 1000 from a 32 bit range, allowing up to 4.2 million
parent contexts to be created and destroyed before the associated Router
must be recreated.
This is handled by :class:`mitogen.master.IdAllocator` in the master
process, and messages are sent to it from
:class:`mitogen.parent.ChildIdAllocator` in children.
Children listen on the following handles:

@ -155,40 +155,6 @@ Common sources of import latency and bandwidth consumption are mitigated:
representing 1.7MiB of uncompressed source split across 148 modules.
SSH Client Emulation
####################
.. image:: images/fakessh.svg
:class: mitogen-right-300
Support is included for starting subprocesses with a modified environment, that
cause their attempt to use SSH to be redirected back into the host program. In
this way tools like `rsync`, `git`, `sftp`, and `scp` can efficiently reuse the
host program's existing connection to the remote machine, including any
firewall/user account hopping in use, with no additional configuration.
Scenarios that were not previously possible with these tools are enabled, such
as running `sftp` and `rsync` over a `sudo` session, to an account the user
cannot otherwise directly log into, including in restrictive environments that
for example enforce an interactive TTY and account password.
.. raw:: html
<div style="clear: both;"></div>
.. code-block:: python
bastion = router.ssh(hostname='bastion.mycorp.com')
webserver = router.ssh(via=bastion, hostname='webserver')
webapp = router.sudo(via=webserver, username='webapp')
fileserver = router.ssh(via=bastion, hostname='fileserver')
# Transparently tunnelled over fileserver -> .. -> sudo.webapp link
fileserver.call(mitogen.fakessh.run, webapp, [
'rsync', 'appdata', 'appserver:appdata'
])
Message Routing
###############

@ -2,6 +2,11 @@
Internal API Reference
**********************
.. note::
Internal APIs are subject to rapid change even across minor releases. This
page exists to help users modify and extend the library.
.. toctree::
:hidden:
@ -40,139 +45,202 @@ Latch
:members:
PidfulStreamHandler
===================
Logging
=======
See also :class:`mitogen.core.IoLoggerProtocol`.
.. currentmodule:: mitogen.master
.. autoclass:: LogForwarder
:members:
.. currentmodule:: mitogen.core
.. autoclass:: PidfulStreamHandler
:members:
Side
====
Stream, Side & Protocol
=======================
.. currentmodule:: mitogen.core
.. autoclass:: Stream
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Side
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Protocol
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: BootstrapProtocol
:members:
Stream
======
.. currentmodule:: mitogen.core
.. autoclass:: DelimitedProtocol
:members:
.. currentmodule:: mitogen.core
.. autoclass:: BasicStream
.. autoclass:: IoLoggerProtocol
:members:
.. autoclass:: Stream
.. currentmodule:: mitogen.core
.. autoclass:: MitogenProtocol
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: MitogenProtocol
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Waker
:members:
Connection & Options
====================
.. currentmodule:: mitogen.fork
.. autoclass:: Stream
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: Stream
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.ssh
.. autoclass:: Stream
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
.. currentmodule:: mitogen.sudo
.. autoclass:: Stream
.. autoclass:: Options
:members:
.. autoclass:: Connection
:members:
Import Mechanism
================
.. currentmodule:: mitogen.core
.. autoclass:: IoLogger
.. autoclass:: Importer
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Waker
.. currentmodule:: mitogen.master
.. autoclass:: ModuleResponder
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: ModuleForwarder
:members:
Importer
========
.. currentmodule:: mitogen.core
.. autoclass:: Importer
Module Finders
==============
.. currentmodule:: mitogen.master
.. autoclass:: ModuleFinder
:members:
.. currentmodule:: mitogen.master
.. autoclass:: FinderMethod
:members:
.. currentmodule:: mitogen.master
.. autoclass:: DefectivePython3xMainMethod
:members:
.. currentmodule:: mitogen.master
.. autoclass:: PkgutilMethod
:members:
ModuleResponder
===============
.. currentmodule:: mitogen.master
.. autoclass:: SysModulesMethod
:members:
.. currentmodule:: mitogen.master
.. autoclass:: ModuleResponder
.. autoclass:: ParentEnumerationMethod
:members:
RouteMonitor
============
Routing Management
==================
.. currentmodule:: mitogen.parent
.. autoclass:: RouteMonitor
:members:
TimerList
=========
Timer Management
================
.. currentmodule:: mitogen.parent
.. autoclass:: TimerList
:members:
Timer
=====
.. currentmodule:: mitogen.parent
.. autoclass:: Timer
:members:
Forwarder
=========
Context ID Allocation
=====================
.. currentmodule:: mitogen.master
.. autoclass:: IdAllocator
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: ModuleForwarder
.. autoclass:: ChildIdAllocator
:members:
ExternalContext
===============
Child Implementation
====================
.. currentmodule:: mitogen.core
.. autoclass:: ExternalContext
:members:
.. currentmodule:: mitogen.core
.. autoclass:: Dispatcher
:members:
Process
=======
Process Management
==================
.. currentmodule:: mitogen.parent
.. autoclass:: Process
.. autoclass:: Reaper
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: Process
:members:
Helpers
=======
Blocking I/O
------------
.. currentmodule:: mitogen.parent
.. autoclass:: PopenProcess
:members:
These functions exist to support the blocking phase of setting up a new
context. They will eventually be replaced with asynchronous equivalents.
.. currentmodule:: mitogen.fork
.. autoclass:: Process
:members:
.. currentmodule:: mitogen.parent
.. autofunction:: discard_until
.. autofunction:: iter_read
.. autofunction:: write_all
Helper Functions
================
Subprocess Functions
------------
---------------------
.. currentmodule:: mitogen.parent
.. autofunction:: create_child
@ -184,15 +252,15 @@ Helpers
-------
.. currentmodule:: mitogen.core
.. autofunction:: to_text
.. autofunction:: has_parent_authority
.. autofunction:: io_op
.. autofunction:: pipe
.. autofunction:: set_block
.. autofunction:: set_cloexec
.. autofunction:: set_nonblock
.. autofunction:: set_block
.. autofunction:: io_op
.. autofunction:: to_text
.. currentmodule:: mitogen.parent
.. autofunction:: close_nonstandard_fds
.. autofunction:: create_socketpair
.. currentmodule:: mitogen.master
@ -205,4 +273,53 @@ Helpers
Signals
=======
:ref:`Please refer to Signals <signals>`.
Mitogen contains a simplistic signal mechanism to decouple its components. When
a signal is fired by an instance of a class, functions registered to receive it
are called back.
.. warning::
As signals execute on the Broker thread, and without exception handling,
they are generally unsafe for consumption by user code, as any bugs could
trigger crashes and hangs for which the broker is unable to forward logs,
or ensure the buggy context always shuts down on disconnect.
Functions
---------
.. currentmodule:: mitogen.core
.. autofunction:: listen
.. autofunction:: unlisten
.. autofunction:: fire
List
----
These signals are used internally by Mitogen.
.. list-table::
:header-rows: 1
:widths: auto
* - Class
- Name
- Description
* - :py:class:`mitogen.core.Stream`
- ``disconnect``
- Fired on the Broker thread when disconnection is detected.
* - :py:class:`mitogen.core.Context`
- ``disconnect``
- Fired on the Broker thread during shutdown (???)
* - :py:class:`mitogen.core.Broker`
- ``shutdown``
- Fired after Broker.shutdown() is called.
* - :py:class:`mitogen.core.Broker`
- ``exit``
- Fired immediately prior to the broker thread exit.

@ -1,60 +0,0 @@
.. _signals:
Signals
=======
Mitogen contains a simplistic signal mechanism to help decouple its internal
components. When a signal is fired by a particular instance of a class, any
functions registered to receive it will be called back.
.. warning::
As signals execute on the Broker thread, and without exception handling,
they are generally unsafe for consumption by user code, as any bugs could
trigger crashes and hangs for which the broker is unable to forward logs,
or ensure the buggy context always shuts down on disconnect.
Functions
---------
.. currentmodule:: mitogen.core
.. autofunction:: listen
.. autofunction:: fire
List
----
These signals are used internally by Mitogen.
.. list-table::
:header-rows: 1
:widths: auto
* - Class
- Name
- Description
* - :py:class:`mitogen.core.Stream`
- ``disconnect``
- Fired on the Broker thread when disconnection is detected.
* - :py:class:`mitogen.core.Context`
- ``disconnect``
- Fired on the Broker thread during shutdown (???)
* - :py:class:`mitogen.core.Router`
- ``shutdown``
- Fired on the Broker thread after Broker.shutdown() is called.
* - :py:class:`mitogen.core.Broker`
- ``shutdown``
- Fired after Broker.shutdown() is called.
* - :py:class:`mitogen.core.Broker`
- ``exit``
- Fired immediately prior to the broker thread exit.

@ -122,6 +122,7 @@ LOAD_MODULE = 107
FORWARD_MODULE = 108
DETACHING = 109
CALL_SERVICE = 110
STUB_CALL_SERVICE = 111
#: Special value used to signal disconnection or the inability to route a
#: message, when it appears in the `reply_to` field. Usually causes
@ -406,22 +407,38 @@ def has_parent_authority(msg, _stream=None):
msg.auth_id in mitogen.parent_ids)
def _signals(obj, signal):
return (
obj.__dict__
.setdefault('_signals', {})
.setdefault(signal, [])
)
def listen(obj, name, func):
"""
Arrange for `func(*args, **kwargs)` to be invoked when the named signal is
Arrange for `func()` to be invoked when signal `name` is fired on `obj`.
"""
_signals(obj, name).append(func)
def unlisten(obj, name, func):
"""
Remove `func()` from the list of functions invoked when signal `name` is
fired by `obj`.
:raises ValueError:
`func()` was not on the list.
"""
signals = vars(obj).setdefault('_signals', {})
signals.setdefault(name, []).append(func)
_signals(obj, name).remove(func)
def fire(obj, name, *args, **kwargs):
"""
Arrange for `func(*args, **kwargs)` to be invoked for every function
registered for the named signal on `obj`.
registered for signal `name` on `obj`.
"""
signals = vars(obj).get('_signals', {})
for func in signals.get(name, ()):
for func in _signals(obj, name):
func(*args, **kwargs)
@ -862,7 +879,8 @@ class Message(object):
if msg.handle:
(self.router or router).route(msg)
else:
LOG.debug('Message.reply(): discarding due to zero handle: %r', msg)
LOG.debug('dropping reply to message with no return address: %r',
msg)
if PY3:
UNPICKLER_KWARGS = {'encoding': 'bytes'}
@ -932,7 +950,7 @@ class Sender(object):
Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param Context context:
:param mitogen.core.Context context:
Context to send messages to.
:param int dst_handle:
Destination handle to send messages to.
@ -1078,13 +1096,32 @@ class Receiver(object):
self.handle = None
self._latch.close()
def size(self):
"""
Return the number of items currently buffered.
As with :class:`Queue.Queue`, `0` may be returned even though a
subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`size` and :meth:`get`.
As with :class:`Queue.Queue`, `>0` may be returned even though a
subsequent call to :meth:`get` will block, since another waiting thread
may be woken at any moment between :meth:`size` and :meth:`get`.
:raises LatchError:
The underlying latch has already been marked closed.
"""
return self._latch.size()
def empty(self):
"""
Return :data:`True` if calling :meth:`get` would block.
Return `size() == 0`.
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
.. deprecated:: 0.2.8
Use :meth:`size` instead.
:raises LatchError:
The latch has already been marked closed.
"""
return self._latch.empty()
@ -1133,7 +1170,10 @@ class Channel(Sender, Receiver):
A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality.
This class is incomplete and obsolete, it will be removed in Mitogen 0.3.
.. deprecated:: 0.2.0
This class is incomplete and obsolete, it will be removed in Mitogen
0.3.
Channels were an early attempt at syntax sugar. It is always easier to pass
around unidirectional pairs of senders/receivers, even though the syntax is
baroque:
@ -1211,6 +1251,7 @@ class Importer(object):
ALWAYS_BLACKLIST += ['cStringIO']
def __init__(self, router, context, core_src, whitelist=(), blacklist=()):
self._log = logging.getLogger('mitogen.importer')
self._context = context
self._present = {'mitogen': self.MITOGEN_PKG_CONTENT}
self._lock = threading.Lock()
@ -1259,7 +1300,7 @@ class Importer(object):
)
def __repr__(self):
return 'Importer()'
return 'Importer'
def builtin_find_module(self, fullname):
# imp.find_module() will always succeed for __main__, because it is a
@ -1284,18 +1325,18 @@ class Importer(object):
_tls.running = True
try:
_v and LOG.debug('%r.find_module(%r)', self, fullname)
#_v and self._log.debug('Python requested %r', fullname)
fullname = to_text(fullname)
pkgname, dot, _ = str_rpartition(fullname, '.')
pkg = sys.modules.get(pkgname)
if pkgname and getattr(pkg, '__loader__', None) is not self:
LOG.debug('%r: %r is submodule of a package we did not load',
self, fullname)
self._log.debug('%s is submodule of a locally loaded package',
fullname)
return None
suffix = fullname[len(pkgname+dot):]
if pkgname and suffix not in self._present.get(pkgname, ()):
LOG.debug('%r: master doesn\'t know %r', self, fullname)
self._log.debug('%s has no submodule %s', pkgname, suffix)
return None
# #114: explicitly whitelisted prefixes override any
@ -1306,10 +1347,9 @@ class Importer(object):
try:
self.builtin_find_module(fullname)
_vv and IOLOG.debug('%r: %r is available locally',
self, fullname)
_vv and self._log.debug('%r is available locally', fullname)
except ImportError:
_vv and IOLOG.debug('find_module(%r) returning self', fullname)
_vv and self._log.debug('we will try to load %r', fullname)
return self
finally:
del _tls.running
@ -1360,7 +1400,7 @@ class Importer(object):
tup = msg.unpickle()
fullname = tup[0]
_v and LOG.debug('importer: received %s', fullname)
_v and self._log.debug('received %s', fullname)
self._lock.acquire()
try:
@ -1384,12 +1424,12 @@ class Importer(object):
if not present:
funcs = self._callbacks.get(fullname)
if funcs is not None:
_v and LOG.debug('%s: existing request for %s in flight',
self, fullname)
_v and self._log.debug('existing request for %s in flight',
fullname)
funcs.append(callback)
else:
_v and LOG.debug('%s: requesting %s from parent',
self, fullname)
_v and self._log.debug('sending new %s request to parent',
fullname)
self._callbacks[fullname] = [callback]
self._context.send(
Message(data=b(fullname), handle=GET_MODULE)
@ -1402,7 +1442,7 @@ class Importer(object):
def load_module(self, fullname):
fullname = to_text(fullname)
_v and LOG.debug('importer: requesting %s', fullname)
_v and self._log.debug('requesting %s', fullname)
self._refuse_imports(fullname)
event = threading.Event()
@ -1536,6 +1576,14 @@ class Stream(object):
name = u'default'
def set_protocol(self, protocol):
"""
Bind a protocol to this stream, by updating :attr:`Protocol.stream` to
refer to this stream, and updating this stream's
:attr:`Stream.protocol` to the refer to the protocol. Any prior
protocol's :attr:`Protocol.stream` is set to :data:`None`.
"""
if self.protocol:
self.protocol.stream = None
self.protocol = protocol
self.protocol.stream = self
@ -1608,7 +1656,11 @@ class Protocol(object):
implementation to be replaced without modifying behavioural logic.
"""
stream_class = Stream
#: The :class:`Stream` this protocol is currently bound to, or
#: :data:`None`.
stream = None
read_size = CHUNK_SIZE
@classmethod
@ -1666,7 +1718,8 @@ class DelimitedProtocol(Protocol):
_trailer = b('')
def on_receive(self, broker, buf):
IOLOG.debug('%r.on_receive()', self)
_vv and IOLOG.debug('%r.on_receive()', self)
stream = self.stream
self._trailer, cont = mitogen.core.iter_split(
buf=self._trailer + buf,
delim=self.delimiter,
@ -1677,13 +1730,31 @@ class DelimitedProtocol(Protocol):
if cont:
self.on_partial_line_received(self._trailer)
else:
assert self.stream.protocol is not self
self.stream.protocol.on_receive(broker, self._trailer)
assert stream.protocol is not self
stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line):
"""
Receive a line from the stream.
:param bytes line:
The encoded line, excluding the delimiter.
:returns:
:data:`False` to indicate this invocation modified the stream's
active protocol, and any remaining buffered data should be passed
to the new protocol's :meth:`on_receive` method.
Any other return value is ignored.
"""
pass
def on_partial_line_received(self, line):
"""
Receive a trailing unterminated partial line from the stream.
:param bytes line:
The encoded partial line.
"""
pass
@ -1730,13 +1801,13 @@ class BufferedWriter(object):
buf = self._buf.popleft()
written = self._protocol.stream.transmit_side.write(buf)
if not written:
_v and LOG.debug('%r.on_transmit(): disconnection detected', self)
_v and LOG.debug('disconnected during write to %r', self)
self._protocol.stream.on_disconnect(broker)
return
elif written != len(buf):
self._buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
_vv and IOLOG.debug('transmitted %d bytes to %r', written, self)
self._len -= written
if not self._buf:
@ -1752,7 +1823,7 @@ class Side(object):
underlying FD, preventing erroneous duplicate calls to :func:`os.close` due
to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk
silently succeeding by closing an unrelated descriptor. For this reason, it
is crucial only one :class:`Side` exists per unique descriptor.
is crucial only one file object exists per unique descriptor.
:param mitogen.core.Stream stream:
The stream this side is associated with.
@ -1780,8 +1851,8 @@ class Side(object):
self.fp = fp
#: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called. This is saved separately from the
#: file object, since fileno() cannot be called on it after it has been
#: closed.
#: file object, since :meth:`file.fileno` cannot be called on it after
#: it has been closed.
self.fd = fp.fileno()
#: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the
@ -1808,7 +1879,7 @@ class Side(object):
def close(self):
"""
Call :func:`os.close` on :attr:`fd` if it is not :data:`None`,
Call :meth:`file.close` on :attr:`fp` if it is not :data:`None`,
then set it to :data:`None`.
"""
_vv and IOLOG.debug('%r.close()', self)
@ -1827,7 +1898,7 @@ class Side(object):
in a 0-sized read like a regular file.
:returns:
Bytes read, or the empty to string to indicate disconnection was
Bytes read, or the empty string to indicate disconnection was
detected.
"""
if self.closed:
@ -2010,7 +2081,7 @@ class Context(object):
explicitly, as that method is deduplicating, and returns the only context
instance :ref:`signals` will be raised on.
:param Router router:
:param mitogen.core.Router router:
Router to emit messages through.
:param int context_id:
Context ID.
@ -2055,13 +2126,13 @@ class Context(object):
msg.dst_id = self.context_id
msg.reply_to = receiver.handle
_v and LOG.debug('%r.send_async(%r)', self, msg)
_v and LOG.debug('sending message to %r: %r', self, msg)
self.send(msg)
return receiver
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):
@ -2337,19 +2408,17 @@ class Latch(object):
finally:
self._lock.release()
def empty(self):
def size(self):
"""
Return :data:`True` if calling :meth:`get` would block.
Return the number of items currently buffered.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
As with :class:`Queue.Queue`, `0` may be returned even though a
subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`size` and :meth:`get`.
As with :class:`Queue.Queue`, :data:`False` may be returned even
though a subsequent call to :meth:`get` will block, since another
waiting thread may be woken at any moment between :meth:`empty` and
:meth:`get`.
As with :class:`Queue.Queue`, `>0` may be returned even though a
subsequent call to :meth:`get` will block, since another waiting thread
may be woken at any moment between :meth:`size` and :meth:`get`.
:raises LatchError:
The latch has already been marked closed.
@ -2358,10 +2427,22 @@ class Latch(object):
try:
if self.closed:
raise LatchError()
return len(self._queue) == 0
return len(self._queue)
finally:
self._lock.release()
def empty(self):
"""
Return `size() == 0`.
.. deprecated:: 0.2.8
Use :meth:`size` instead.
:raises LatchError:
The latch has already been marked closed.
"""
return self.size() == 0
def _get_socketpair(self):
"""
Return an unused socketpair, creating one if none exist.
@ -2655,7 +2736,11 @@ class IoLoggerProtocol(DelimitedProtocol):
def on_shutdown(self, broker):
"""
Shut down the write end of the logging socket.
Shut down the write end of the socket, preventing any further writes to
it by this process, or subprocess that inherited it. This allows any
remaining kernel-buffered data to be drained during graceful shutdown
without the buffer continuously refilling due to some out of control
child process.
"""
_v and LOG.debug('%r: shutting down', self)
if not IS_WSL:
@ -2667,6 +2752,9 @@ class IoLoggerProtocol(DelimitedProtocol):
self.stream.transmit_side.close()
def on_line_received(self, line):
"""
Decode the received line as UTF-8 and pass it to the logging framework.
"""
self._log.info('%s', line.decode('utf-8', 'replace'))
@ -2679,7 +2767,12 @@ class Router(object):
**Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one.
"""
#: The :class:`mitogen.core.Context` subclass to use when constructing new
#: :class:`Context` objects in :meth:`myself` and :meth:`context_by_id`.
#: Permits :class:`Router` subclasses to extend the :class:`Context`
#: interface, as done in :class:`mitogen.parent.Router`.
context_class = Context
max_message_size = 128 * 1048576
#: When :data:`True`, permit children to only communicate with the current
@ -2699,6 +2792,18 @@ class Router(object):
#: parameter.
unidirectional = False
duplicate_handle_msg = 'cannot register a handle that already exists'
refused_msg = 'refused by policy'
invalid_handle_msg = 'invalid handle'
too_large_msg = 'message too large (max %d bytes)'
respondent_disconnect_msg = 'the respondent Context has disconnected'
broker_exit_msg = 'Broker has exitted'
no_route_msg = 'no route to %r, my ID is %r'
unidirectional_msg = (
'routing mode prevents forward of message from context %d via '
'context %d'
)
def __init__(self, broker):
self.broker = broker
listen(broker, 'exit', self._on_broker_exit)
@ -2766,16 +2871,21 @@ class Router(object):
for context in notify:
context.on_disconnect()
broker_exit_msg = 'Broker has exitted'
def _on_broker_exit(self):
"""
Called prior to broker exit, informs callbacks registered with
:meth:`add_handler` the connection is dead.
"""
_v and LOG.debug('%r: broker has exitted', self)
while self._handle_map:
_, (_, func, _, _) = self._handle_map.popitem()
func(Message.dead(self.broker_exit_msg))
def myself(self):
"""
Return a :class:`Context` referring to the current process.
Return a :class:`Context` referring to the current process. Since
:class:`Context` is serializable, this is convenient to use in remote
function call parameter lists.
"""
return self.context_class(
router=self,
@ -2785,8 +2895,25 @@ class Router(object):
def context_by_id(self, context_id, via_id=None, create=True, name=None):
"""
Messy factory/lookup function to find a context by its ID, or construct
it. This will eventually be replaced by a more sensible interface.
Return or construct a :class:`Context` given its ID. An internal
mapping of ID to the canonical :class:`Context` representing that ID,
so that :ref:`signals` can be raised.
This may be called from any thread, lookup and construction are atomic.
:param int context_id:
The context ID to look up.
:param int via_id:
If the :class:`Context` does not already exist, set its
:attr:`Context.via` to the :class:`Context` matching this ID.
:param bool create:
If the :class:`Context` does not already exist, create it.
:param str name:
If the :class:`Context` does not already exist, set its name.
:returns:
:class:`Context`, or return :data:`None` if `create` is
:data:`False` and no :class:`Context` previously existed.
"""
context = self._context_by_id.get(context_id)
if context:
@ -2831,7 +2958,13 @@ class Router(object):
"""
Return the :class:`Stream` that should be used to communicate with
`dst_id`. If a specific route for `dst_id` is not known, a reference to
the parent context's stream is returned.
the parent context's stream is returned. If the parent is disconnected,
or when running in the master context, return :data:`None` instead.
This can be used from any thread, but its output is only meaningful
from the context of the :class:`Broker` thread, as disconnection or
replacement could happen in parallel on the broker thread at any
moment.
"""
return (
self._stream_by_id.get(dst_id) or
@ -2867,7 +3000,7 @@ class Router(object):
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param Context respondent:
:param mitogen.core.Context respondent:
Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected.
@ -2921,35 +3054,12 @@ class Router(object):
return handle
duplicate_handle_msg = 'cannot register a handle that already exists'
refused_msg = 'refused by policy'
invalid_handle_msg = 'invalid handle'
too_large_msg = 'message too large (max %d bytes)'
respondent_disconnect_msg = 'the respondent Context has disconnected'
broker_shutdown_msg = 'Broker is shutting down'
no_route_msg = 'no route to %r, my ID is %r'
unidirectional_msg = (
'routing mode prevents forward of message from context %d via '
'context %d'
)
def _on_respondent_disconnect(self, context):
for handle in self._handles_by_respondent.pop(context, ()):
_, fn, _, _ = self._handle_map[handle]
fn(Message.dead(self.respondent_disconnect_msg))
del self._handle_map[handle]
def on_shutdown(self, broker):
"""
Called during :meth:`Broker.shutdown`, informs callbacks registered
with :meth:`add_handle_cb` the connection is dead.
"""
_v and LOG.debug('%r: shutting down', self, broker)
fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems():
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(Message.dead(self.broker_shutdown_msg))
def _maybe_send_dead(self, msg, reason, *args):
if args:
reason %= args
@ -3248,10 +3358,10 @@ class Broker(object):
self._loop_once(max(0, deadline - time.time()))
if self.keep_alive():
LOG.error('%r: some streams did not close gracefully. '
'The most likely cause for this is one or '
'more child processes still connected to '
'our stdout/stderr pipes.', self)
LOG.error('%r: pending work still existed %d seconds after '
'shutdown began. This may be due to a timer that is yet '
'to expire, or a child connection that did not fully '
'shut down.', self, self.shutdown_timeout)
def _do_broker_main(self):
"""
@ -3323,9 +3433,22 @@ class Dispatcher(object):
self.econtext = econtext
#: Chain ID -> CallError if prior call failed.
self._error_by_chain_id = {}
self.recv = Receiver(router=econtext.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
self.recv = Receiver(
router=econtext.router,
handle=CALL_FUNCTION,
policy=has_parent_authority,
)
#: The :data:`CALL_SERVICE` :class:`Receiver` that will eventually be
#: reused by :class:`mitogen.service.Pool`, should it ever be loaded.
#: This is necessary for race-free reception of all service requests
#: delivered regardless of whether the stub or real service pool are
#: loaded. See #547 for related sorrows.
Dispatcher._service_recv = Receiver(
router=econtext.router,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self._service_recv.notify = self._on_call_service
listen(econtext.broker, 'shutdown', self.recv.close)
@classmethod
@ -3366,8 +3489,45 @@ class Dispatcher(object):
self._error_by_chain_id[chain_id] = e
return chain_id, e
def _on_call_service(self, recv):
"""
Notifier for the :data:`CALL_SERVICE` receiver. This is called on the
:class:`Broker` thread for any service messages arriving at this
context, for as long as no real service pool implementation is loaded.
In order to safely bootstrap the service pool implementation a sentinel
message is enqueued on the :data:`CALL_FUNCTION` receiver in order to
wake the main thread, where the importer can run without any
possibility of suffering deadlock due to concurrent uses of the
importer.
Should the main thread be blocked indefinitely, preventing the import
from ever running, if it is blocked waiting on a service call, then it
means :mod:`mitogen.service` has already been imported and
:func:`mitogen.service.get_or_create_pool` has already run, meaning the
service pool is already active and the duplicate initialization was not
needed anyway.
#547: This trickery is needed to avoid the alternate option of spinning
a temporary thread to import the service pool, which could deadlock if
a custom import hook executing on the main thread (under the importer
lock) would block waiting for some data that was in turn received by a
service. Main thread import lock can't be released until service is
running, service cannot satisfy request until import lock is released.
"""
self.recv._on_receive(Message(handle=STUB_CALL_SERVICE))
def _init_service_pool(self):
import mitogen.service
mitogen.service.get_or_create_pool(router=self.econtext.router)
def _dispatch_calls(self):
for msg in self.recv:
if msg.handle == STUB_CALL_SERVICE:
if msg.src_id == mitogen.context_id:
self._init_service_pool()
continue
chain_id, ret = self._dispatch_one(msg)
_v and LOG.debug('%r: %r -> %r', self, msg, ret)
if msg.reply_to:
@ -3386,30 +3546,36 @@ class ExternalContext(object):
"""
External context implementation.
This class contains the main program implementation for new children. It is
responsible for setting up everything about the process environment, import
hooks, standard IO redirection, logging, configuring a :class:`Router` and
:class:`Broker`, and finally arranging for :class:`Dispatcher` to take over
the main thread after initialization is complete.
.. attribute:: broker
The :class:`mitogen.core.Broker` instance.
.. attribute:: context
The :class:`mitogen.core.Context` instance.
.. attribute:: channel
The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION`
requests are received.
.. attribute:: stdout_log
The :class:`mitogen.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :class:`mitogen.core.Importer` instance.
.. attribute:: stdout_log
The :class:`IoLogger` connected to ``stdout``.
The :class:`IoLogger` connected to :data:`sys.stdout`.
.. attribute:: stderr_log
The :class:`IoLogger` connected to ``stderr``.
.. method:: _dispatch_calls
Implementation for the main thread in every child context.
The :class:`IoLogger` connected to :data:`sys.stderr`.
"""
detached = False
@ -3420,34 +3586,6 @@ class ExternalContext(object):
if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM)
#: On Python >3.4, the global importer lock has been sharded into a
#: per-module lock, meaning there is no guarantee the import statement in
#: service_stub_main will be truly complete before a second thread
#: attempting the same import will see a partially initialized module.
#: Sigh. Therefore serialize execution of the stub itself.
service_stub_lock = threading.Lock()
def _service_stub_main(self, msg):
self.service_stub_lock.acquire()
try:
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
finally:
self.service_stub_lock.release()
def _on_call_service_msg(self, msg):
"""
Stub service handler. Start a thread to import the mitogen.service
implementation from, and deliver the message to the newly constructed
pool. This must be done as CALL_SERVICE for e.g. PushFileService may
race with a CALL_FUNCTION blocking the main thread waiting for a result
from that service.
"""
if not msg.is_dead:
th = threading.Thread(target=self._service_stub_main, args=(msg,))
th.start()
def _on_shutdown_msg(self, msg):
if not msg.is_dead:
_v and LOG.debug('shutdown request from context %d', msg.src_id)
@ -3491,11 +3629,6 @@ class ExternalContext(object):
handle=SHUTDOWN,
policy=has_parent_authority,
)
self.router.add_handler(
fn=self._on_call_service_msg,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self.master = Context(self.router, 0, 'master')
parent_id = self.config['parent_ids'][0]
if parent_id == 0:
@ -3503,7 +3636,10 @@ class ExternalContext(object):
else:
self.parent = Context(self.router, parent_id, 'parent')
in_fp = os.fdopen(os.dup(self.config.get('in_fd', 100)), 'rb', 0)
in_fd = self.config.get('in_fd', 100)
in_fp = os.fdopen(os.dup(in_fd), 'rb', 0)
os.close(in_fd)
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
self.stream = MitogenProtocol.build_stream(self.router, parent_id)
self.stream.accept(in_fp, out_fp)
@ -3589,13 +3725,15 @@ class ExternalContext(object):
os.dup2(fd, stdfd)
os.close(fd)
def _setup_stdio(self):
# #481: when stderr is a TTY due to being started via
# tty_create_child()/hybrid_tty_create_child(), and some privilege
# escalation tool like prehistoric versions of sudo exec this process
# over the top of itself, there is nothing left to keep the slave PTY
# open after we replace our stdio. Therefore if stderr is a TTY, keep
# around a permanent dup() to avoid receiving SIGHUP.
def _preserve_tty_fp(self):
"""
#481: when stderr is a TTY due to being started via tty_create_child()
or hybrid_tty_create_child(), and some privilege escalation tool like
prehistoric versions of sudo exec this process over the top of itself,
there is nothing left to keep the slave PTY open after we replace our
stdio. Therefore if stderr is a TTY, keep around a permanent dup() to
avoid receiving SIGHUP.
"""
try:
if os.isatty(2):
self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0)
@ -3603,6 +3741,8 @@ class ExternalContext(object):
except OSError:
pass
def _setup_stdio(self):
self._preserve_tty_fp()
# When sys.stdout was opened by the runtime, overwriting it will not
# close FD 1. However when forking from a child that previously used
# fdopen(), overwriting it /will/ close FD 1. So we must swallow the

@ -41,7 +41,7 @@ import mitogen.parent
from mitogen.core import b
LOG = logging.getLogger('mitogen')
LOG = logging.getLogger(__name__)
# Python 2.4/2.5 cannot support fork+threads whatsoever, it doesn't even fix up
# interpreter state. So 2.4/2.5 interpreters start .local() contexts for

@ -28,15 +28,10 @@
# !mitogen: minify_safe
import logging
import mitogen.core
import mitogen.parent
LOG = logging.getLogger(__name__)
class Options(mitogen.parent.Options):
container = None
username = None

@ -28,15 +28,10 @@
# !mitogen: minify_safe
import logging
import mitogen.core
import mitogen.parent
LOG = logging.getLogger(__name__)
class Options(mitogen.parent.Options):
pod = None
kubectl_path = 'kubectl'

@ -28,15 +28,10 @@
# !mitogen: minify_safe
import logging
import mitogen.core
import mitogen.parent
LOG = logging.getLogger(__name__)
class Options(mitogen.parent.Options):
container = None
lxc_attach_path = 'lxc-attach'

@ -28,15 +28,10 @@
# !mitogen: minify_safe
import logging
import mitogen.core
import mitogen.parent
LOG = logging.getLogger(__name__)
class Options(mitogen.parent.Options):
container = None
lxc_path = 'lxc'

@ -387,26 +387,26 @@ class LogForwarder(object):
if msg.is_dead:
return
logger = self._cache.get(msg.src_id)
if logger is None:
context = self._router.context_by_id(msg.src_id)
if context is None:
LOG.error('%s: dropping log from unknown context ID %d',
self, msg.src_id)
return
name = '%s.%s' % (RLOG.name, context.name)
self._cache[msg.src_id] = logger = logging.getLogger(name)
context = self._router.context_by_id(msg.src_id)
if context is None:
LOG.error('%s: dropping log from unknown context %d',
self, msg.src_id)
return
name, level_s, s = msg.data.decode('utf-8', 'replace').split('\x00', 2)
logger_name = '%s.[%s]' % (name, context.name)
logger = self._cache.get(logger_name)
if logger is None:
self._cache[logger_name] = logger = logging.getLogger(logger_name)
# See logging.Handler.makeRecord()
record = logging.LogRecord(
name=logger.name,
level=int(level_s),
pathname='(unknown file)',
lineno=0,
msg=('%s: %s' % (name, s)),
msg=s,
args=(),
exc_info=None,
)
@ -430,8 +430,8 @@ class FinderMethod(object):
def find(self, fullname):
"""
Accept a canonical module name and return `(path, source, is_pkg)`
tuples, where:
Accept a canonical module name as would be found in :data:`sys.modules`
and return a `(path, source, is_pkg)` tuple, where:
* `path`: Unicode string containing path to source file.
* `source`: Bytestring containing source file's content.
@ -447,10 +447,13 @@ class DefectivePython3xMainMethod(FinderMethod):
"""
Recent versions of Python 3.x introduced an incomplete notion of
importer specs, and in doing so created permanent asymmetry in the
:mod:`pkgutil` interface handling for the `__main__` module. Therefore
we must handle `__main__` specially.
:mod:`pkgutil` interface handling for the :mod:`__main__` module. Therefore
we must handle :mod:`__main__` specially.
"""
def find(self, fullname):
"""
Find :mod:`__main__` using its :data:`__file__` attribute.
"""
if fullname != '__main__':
return None
@ -477,6 +480,9 @@ class PkgutilMethod(FinderMethod):
be the only required implementation of get_module().
"""
def find(self, fullname):
"""
Find `fullname` using :func:`pkgutil.find_loader`.
"""
try:
# Pre-'import spec' this returned None, in Python3.6 it raises
# ImportError.
@ -522,10 +528,13 @@ class PkgutilMethod(FinderMethod):
class SysModulesMethod(FinderMethod):
"""
Attempt to fetch source code via sys.modules. This is specifically to
support __main__, but it may catch a few more cases.
Attempt to fetch source code via :data:`sys.modules`. This was originally
specifically to support :mod:`__main__`, but it may catch a few more cases.
"""
def find(self, fullname):
"""
Find `fullname` using its :data:`__file__` attribute.
"""
module = sys.modules.get(fullname)
LOG.debug('_get_module_via_sys_modules(%r) -> %r', fullname, module)
if getattr(module, '__name__', None) != fullname:
@ -566,14 +575,17 @@ class ParentEnumerationMethod(FinderMethod):
"""
Attempt to fetch source code by examining the module's (hopefully less
insane) parent package. Required for older versions of
ansible.compat.six and plumbum.colors, and Ansible 2.8
ansible.module_utils.distro.
:mod:`ansible.compat.six`, :mod:`plumbum.colors`, and Ansible 2.8
:mod:`ansible.module_utils.distro`.
For cases like module_utils.distro, this must handle cases where a package
transmuted itself into a totally unrelated module during import and vice
versa.
For cases like :mod:`ansible.module_utils.distro`, this must handle cases
where a package transmuted itself into a totally unrelated module during
import and vice versa.
"""
def find(self, fullname):
"""
See implementation for a description of how this works.
"""
if fullname not in sys.modules:
# Don't attempt this unless a module really exists in sys.modules,
# else we could return junk.
@ -796,6 +808,7 @@ class ModuleFinder(object):
class ModuleResponder(object):
def __init__(self, router):
self._log = logging.getLogger('mitogen.responder')
self._router = router
self._finder = ModuleFinder()
self._cache = {} # fullname -> pickled
@ -863,7 +876,7 @@ class ModuleResponder(object):
if b('mitogen.main(') in src:
return src
LOG.error(self.main_guard_msg, path)
self._log.error(self.main_guard_msg, path)
raise ImportError('refused')
def _make_negative_response(self, fullname):
@ -882,8 +895,7 @@ class ModuleResponder(object):
if path and is_stdlib_path(path):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.debug('%r: refusing to serve stdlib module %r',
self, fullname)
self._log.debug('refusing to serve stdlib module %r', fullname)
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
@ -891,7 +903,7 @@ class ModuleResponder(object):
if source is None:
# TODO: make this .warning() or similar again once importer has its
# own logging category.
LOG.debug('_build_tuple(%r): could not locate source', fullname)
self._log.debug('could not find source for %r', fullname)
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
@ -904,8 +916,8 @@ class ModuleResponder(object):
if is_pkg:
pkg_present = get_child_modules(path)
LOG.debug('_build_tuple(%r, %r) -> %r',
path, fullname, pkg_present)
self._log.debug('%s is a package at %s with submodules %r',
fullname, path, pkg_present)
else:
pkg_present = None
@ -936,8 +948,8 @@ class ModuleResponder(object):
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
LOG.debug('%s: sending %s (%.2f KiB) to %s',
self, fullname, len(msg.data) / 1024.0, stream.name)
self._log.debug('sending %s (%.2f KiB) to %s',
fullname, len(msg.data) / 1024.0, stream.name)
self._router._async_route(msg)
stream.protocol.sent_modules.add(fullname)
if tup[2] is not None:
@ -983,7 +995,7 @@ class ModuleResponder(object):
return
fullname = msg.data.decode()
LOG.debug('%s requested module %s', stream.name, fullname)
self._log.debug('%s requested module %s', stream.name, fullname)
self.get_module_count += 1
if fullname in stream.protocol.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',
@ -1216,6 +1228,21 @@ class Router(mitogen.parent.Router):
class IdAllocator(object):
"""
Allocate IDs for new contexts constructed locally, and blocks of IDs for
children to allocate their own IDs using
:class:`mitogen.parent.ChildIdAllocator` without risk of conflict, and
without necessitating network round-trips for each new context.
This class responds to :data:`mitogen.core.ALLOCATE_ID` messages received
from children by replying with fresh block ID allocations.
The master's :class:`IdAllocator` instance can be accessed via
:attr:`mitogen.master.Router.id_allocator`.
"""
#: Block allocations are made in groups of 1000 by default.
BLOCK_SIZE = 1000
def __init__(self, router):
self.router = router
self.next_id = 1
@ -1228,14 +1255,12 @@ class IdAllocator(object):
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
BLOCK_SIZE = 1000
def allocate(self):
"""
Arrange for a unique context ID to be allocated and associated with a
route leading to the active context. In masters, the ID is generated
directly, in children it is forwarded to the master via a
:data:`mitogen.core.ALLOCATE_ID` message.
Allocate a context ID by directly incrementing an internal counter.
:returns:
The new context ID.
"""
self.lock.acquire()
try:
@ -1246,6 +1271,15 @@ class IdAllocator(object):
self.lock.release()
def allocate_block(self):
"""
Allocate a block of IDs for use in a child context.
This function is safe to call from any thread.
:returns:
Tuple of the form `(id, end_id)` where `id` is the first usable ID
and `end_id` is the last usable ID.
"""
self.lock.acquire()
try:
id_ = self.next_id

@ -35,6 +35,7 @@ Support for operating in a mixed threading/forking environment.
import os
import socket
import sys
import threading
import weakref
import mitogen.core
@ -157,6 +158,7 @@ class Corker(object):
held. This will not return until each thread acknowledges it has ceased
execution.
"""
current = threading.currentThread()
s = mitogen.core.b('CORK') * ((128 // 4) * 1024)
self._rsocks = []
@ -164,12 +166,14 @@ class Corker(object):
# participation of a broker in order to complete.
for pool in self.pools:
if not pool.closed:
for x in range(pool.size):
self._cork_one(s, pool)
for th in pool._threads:
if th != current:
self._cork_one(s, pool)
for broker in self.brokers:
if broker._alive:
self._cork_one(s, broker)
if broker._thread != current:
self._cork_one(s, broker)
# Pause until we can detect every thread has entered write().
for rsock in self._rsocks:

@ -41,6 +41,7 @@ import getpass
import heapq
import inspect
import logging
import logging
import os
import re
import signal
@ -65,9 +66,22 @@ except ImportError:
import mitogen.core
from mitogen.core import b
from mitogen.core import bytes_partition
from mitogen.core import LOG
from mitogen.core import IOLOG
LOG = logging.getLogger(__name__)
# #410: we must avoid the use of socketpairs if SELinux is enabled.
try:
fp = open('/sys/fs/selinux/enforce', 'rb')
try:
SELINUX_ENABLED = bool(int(fp.read()))
finally:
fp.close()
except IOError:
SELINUX_ENABLED = False
try:
next
except NameError:
@ -91,6 +105,10 @@ try:
except ValueError:
SC_OPEN_MAX = 1024
BROKER_SHUTDOWN_MSG = (
'Connection cancelled because the associated Broker began to shut down.'
)
OPENPTY_MSG = (
"Failed to create a PTY: %s. It is likely the maximum number of PTYs has "
"been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS "
@ -143,7 +161,7 @@ _core_source_partial = None
def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO)
return (LOG.getEffectiveLevel() or logging.INFO)
def get_sys_executable():
@ -270,6 +288,41 @@ def create_socketpair(size=None):
return parentfp, childfp
def create_best_pipe(escalates_privilege=False):
"""
By default we prefer to communicate with children over a UNIX socket, as a
single file descriptor can represent bidirectional communication, and a
cross-platform API exists to align buffer sizes with the needs of the
library.
SELinux prevents us setting up a privileged process to inherit an AF_UNIX
socket, a facility explicitly designed as a better replacement for pipes,
because at some point in the mid 90s it might have been commonly possible
for AF_INET sockets to end up undesirably connected to a privileged
process, so let's make up arbitrary rules breaking all sockets instead.
If SELinux is detected, fall back to using pipes.
:param bool escalates_privilege:
If :data:`True`, the target program may escalate privileges, causing
SELinux to disconnect AF_UNIX sockets, so avoid those.
:returns:
`(parent_rfp, child_wfp, child_rfp, parent_wfp)`
"""
if (not escalates_privilege) or (not SELINUX_ENABLED):
parentfp, childfp = create_socketpair()
return parentfp, childfp, childfp, parentfp
parent_rfp, child_wfp = mitogen.core.pipe()
try:
child_rfp, parent_wfp = mitogen.core.pipe()
return parent_rfp, child_wfp, child_rfp, parent_wfp
except:
parent_rfp.close()
child_wfp.close()
raise
def popen(**kwargs):
"""
Wrap :class:`subprocess.Popen` to ensure any global :data:`_preexec_hook`
@ -284,7 +337,8 @@ def popen(**kwargs):
return subprocess.Popen(preexec_fn=preexec_fn, **kwargs)
def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
def create_child(args, merge_stdio=False, stderr_pipe=False,
escalates_privilege=False, preexec_fn=None):
"""
Create a child process whose stdin/stdout is connected to a socket.
@ -293,27 +347,30 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
:param bool merge_stdio:
If :data:`True`, arrange for `stderr` to be connected to the `stdout`
socketpair, rather than inherited from the parent process. This may be
necessary to ensure that not TTY is connected to any stdio handle, for
necessary to ensure that no TTY is connected to any stdio handle, for
instance when using LXC.
:param bool stderr_pipe:
If :data:`True` and `merge_stdio` is :data:`False`, arrange for
`stderr` to be connected to a separate pipe, to allow any ongoing debug
logs generated by e.g. SSH to be outpu as the session progresses,
logs generated by e.g. SSH to be output as the session progresses,
without interfering with `stdout`.
:param bool escalates_privilege:
If :data:`True`, the target program may escalate privileges, causing
SELinux to disconnect AF_UNIX sockets, so avoid those.
:param function preexec_fn:
If not :data:`None`, a function to run within the post-fork child
before executing the target program.
:returns:
:class:`Process` instance.
"""
parentfp, childfp = create_socketpair()
# When running under a monkey patches-enabled gevent, the socket module
# yields descriptors who already have O_NONBLOCK, which is persisted across
# fork, totally breaking Python. Therefore, drop O_NONBLOCK from Python's
# future stdin fd.
mitogen.core.set_block(childfp.fileno())
parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe(
escalates_privilege=escalates_privilege
)
stderr = None
stderr_r = None
if merge_stdio:
stderr = childfp
stderr = child_wfp
elif stderr_pipe:
stderr_r, stderr = mitogen.core.pipe()
mitogen.core.set_cloexec(stderr_r.fileno())
@ -321,27 +378,33 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
try:
proc = popen(
args=args,
stdin=childfp,
stdout=childfp,
stdin=child_rfp,
stdout=child_wfp,
stderr=stderr,
close_fds=True,
preexec_fn=preexec_fn,
)
except:
childfp.close()
parentfp.close()
child_rfp.close()
child_wfp.close()
parent_rfp.close()
parent_wfp.close()
if stderr_pipe:
stderr.close()
stderr_r.close()
raise
childfp.close()
child_rfp.close()
child_wfp.close()
if stderr_pipe:
stderr.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, parentfp.fileno(), os.getpid(), Argv(args))
return PopenProcess(proc, stdin=parentfp, stdout=parentfp, stderr=stderr_r)
return PopenProcess(
proc=proc,
stdin=parent_wfp,
stdout=parent_rfp,
stderr=stderr_r,
)
def _acquire_controlling_tty():
@ -453,17 +516,28 @@ def tty_create_child(args):
raise
slave_fp.close()
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, master_fp.fileno(), os.getpid(), Argv(args))
return PopenProcess(proc, stdin=master_fp, stdout=master_fp)
return PopenProcess(
proc=proc,
stdin=master_fp,
stdout=master_fp,
)
def hybrid_tty_create_child(args):
def hybrid_tty_create_child(args, escalates_privilege=False):
"""
Like :func:`tty_create_child`, except attach stdin/stdout to a socketpair
like :func:`create_child`, but leave stderr and the controlling TTY
attached to a TTY.
This permits high throughput communication with programs that are reached
via some program that requires a TTY for password input, like many
configurations of sudo. The UNIX TTY layer tends to have tiny (no more than
14KiB) buffers, forcing many IO loop iterations when transferring bulk
data, causing significant performance loss.
:param bool escalates_privilege:
If :data:`True`, the target program may escalate privileges, causing
SELinux to disconnect AF_UNIX sockets, so avoid those.
:param list args:
Program argument vector.
:returns:
@ -471,20 +545,25 @@ def hybrid_tty_create_child(args):
"""
master_fp, slave_fp = openpty()
try:
parentfp, childfp = create_socketpair()
parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe(
escalates_privilege=escalates_privilege,
)
try:
mitogen.core.set_block(childfp)
mitogen.core.set_block(child_rfp)
mitogen.core.set_block(child_wfp)
proc = popen(
args=args,
stdin=childfp,
stdout=childfp,
stdin=child_rfp,
stdout=child_wfp,
stderr=slave_fp,
preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
except:
parentfp.close()
childfp.close()
parent_rfp.close()
child_wfp.close()
parent_wfp.close()
child_rfp.close()
raise
except:
master_fp.close()
@ -492,17 +571,23 @@ def hybrid_tty_create_child(args):
raise
slave_fp.close()
childfp.close()
LOG.debug('hybrid_tty_create_child() pid=%d stdio=%d, tty=%d, cmd: %s',
proc.pid, parentfp.fileno(), master_fp.fileno(), Argv(args))
return PopenProcess(proc, stdin=parentfp, stdout=parentfp, stderr=master_fp)
child_rfp.close()
child_wfp.close()
return PopenProcess(
proc=proc,
stdin=parent_wfp,
stdout=parent_rfp,
stderr=master_fp,
)
class Timer(object):
"""
Represents a future event.
"""
cancelled = False
#: Set to :data:`False` if :meth:`cancel` has been called, or immediately
#: prior to being executed by :meth:`TimerList.expire`.
active = True
def __init__(self, when, func):
self.when = when
@ -525,7 +610,7 @@ class Timer(object):
Cancel this event. If it has not yet executed, it will not execute
during any subsequent :meth:`TimerList.expire` call.
"""
self.cancelled = True
self.active = False
class TimerList(object):
@ -561,7 +646,7 @@ class TimerList(object):
Floating point delay, or 0.0, or :data:`None` if no events are
scheduled.
"""
while self._lst and self._lst[0].cancelled:
while self._lst and not self._lst[0].active:
heapq.heappop(self._lst)
if self._lst:
return max(0, self._lst[0].when - self._now())
@ -589,7 +674,8 @@ class TimerList(object):
now = self._now()
while self._lst and self._lst[0].when <= now:
timer = heapq.heappop(self._lst)
if not timer.cancelled:
if timer.active:
timer.active = False
timer.func()
@ -659,7 +745,7 @@ def _upgrade_broker(broker):
root.setLevel(old_level)
broker.timers = TimerList()
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
LOG.debug('upgraded %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),
len(old.readers), len(old.writers))
@ -691,7 +777,7 @@ def get_connection_class(name):
def _proxy_connect(name, method_name, kwargs, econtext):
"""
Implements the target portion of Router._proxy_connect() by upgrading the
local context to a parent if it was not already, then calling back into
local process to a parent if it was not already, then calling back into
Router._connect() using the arguments passed to the parent's
Router.connect().
@ -737,13 +823,21 @@ def returncode_to_str(n):
class EofError(mitogen.core.StreamError):
"""
Raised by :func:`iter_read` and :func:`write_all` when EOF is detected by
the child process.
Raised by :class:`Connection` when an empty read is detected from the
remote process before bootstrap completes.
"""
# inherits from StreamError to maintain compatibility.
pass
class CancelledError(mitogen.core.StreamError):
"""
Raised by :class:`Connection` when :meth:`mitogen.core.Broker.shutdown` is
called before bootstrap completes.
"""
pass
class Argv(object):
"""
Wrapper to defer argv formatting when debug logging is disabled.
@ -1064,7 +1158,6 @@ class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
falling back to :meth:`on_unrecognized_line_received` and
:meth:`on_unrecognized_partial_line_received`.
"""
#: A sequence of 2-tuples of the form `(compiled pattern, method)` for
#: patterns that should be matched against complete (delimited) messages,
#: i.e. full lines.
@ -1105,10 +1198,10 @@ class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
class BootstrapProtocol(RegexProtocol):
"""
Respond to stdout of a child during bootstrap. Wait for EC0_MARKER to be
written by the first stage to indicate it can receive the bootstrap, then
await EC1_MARKER to indicate success, and :class:`MitogenProtocol` can be
enabled.
Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER`
to be written by the first stage to indicate it can receive the bootstrap,
then await :attr:`EC1_MARKER` to indicate success, and
:class:`MitogenProtocol` can be enabled.
"""
#: Sentinel value emitted by the first stage to indicate it is ready to
#: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
@ -1129,7 +1222,7 @@ class BootstrapProtocol(RegexProtocol):
self._writer.write(self.stream.conn.get_preamble())
def _on_ec1_received(self, line, match):
LOG.debug('%r: first stage received bootstrap', self)
LOG.debug('%r: first stage received mitogen.core source', self)
def _on_ec2_received(self, line, match):
LOG.debug('%r: new child booted successfully', self)
@ -1236,6 +1329,10 @@ class Options(object):
class Connection(object):
"""
Manage the lifetime of a set of :class:`Streams <Stream>` connecting to a
remote Python interpreter, including bootstrap, disconnection, and external
tool integration.
Base for streams capable of starting children.
"""
options_class = Options
@ -1278,7 +1375,22 @@ class Connection(object):
#: Prefix given to default names generated by :meth:`connect`.
name_prefix = u'local'
timer = None
#: :class:`Timer` that runs :meth:`_on_timer_expired` when connection
#: timeout occurs.
_timer = None
#: When disconnection completes, instance of :class:`Reaper` used to wait
#: on the exit status of the subprocess.
_reaper = None
#: On failure, the exception object that should be propagated back to the
#: user.
exception = None
#: Extra text appended to :class:`EofError` if that exception is raised on
#: a failed connection attempt. May be used in subclasses to hint at common
#: problems with a particular connection method.
eof_error_hint = None
def __init__(self, options, router):
#: :class:`Options`
@ -1405,6 +1517,7 @@ class Connection(object):
def start_child(self):
args = self.get_boot_command()
LOG.debug('command line for %r: %s', self, Argv(args))
try:
return self.create_child(args=args, **self.create_child_args)
except OSError:
@ -1412,8 +1525,6 @@ class Connection(object):
msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args))
raise mitogen.core.StreamError(msg)
eof_error_hint = None
def _adorn_eof_error(self, e):
"""
Subclasses may provide additional information in the case of a failed
@ -1422,11 +1533,11 @@ class Connection(object):
if self.eof_error_hint:
e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)
exception = None
def _complete_connection(self):
self.timer.cancel()
self._timer.cancel()
if not self.exception:
mitogen.core.unlisten(self._router.broker, 'shutdown',
self._on_broker_shutdown)
self._router.register(self.context, self.stdio_stream)
self.stdio_stream.set_protocol(
MitogenProtocol(
@ -1440,11 +1551,13 @@ class Connection(object):
"""
Fail the connection attempt.
"""
LOG.debug('%s: failing connection due to %r',
self.stdio_stream.name, exc)
LOG.debug('failing connection %s due to %r',
self.stdio_stream and self.stdio_stream.name, exc)
if self.exception is None:
self._adorn_eof_error(exc)
self.exception = exc
mitogen.core.unlisten(self._router.broker, 'shutdown',
self._on_broker_shutdown)
for stream in self.stdio_stream, self.stderr_stream:
if stream and not stream.receive_side.closed:
stream.on_disconnect(self._router.broker)
@ -1478,33 +1591,43 @@ class Connection(object):
def _on_streams_disconnected(self):
"""
When disconnection has been detected for both our streams, cancel the
When disconnection has been detected for both streams, cancel the
connection timer, mark the connection failed, and reap the child
process. Do nothing if the timer has already been cancelled, indicating
some existing failure has already been noticed.
"""
if not self.timer.cancelled:
self.timer.cancel()
if self._timer.active:
self._timer.cancel()
self._fail_connection(EofError(
self.eof_error_msg + get_history(
[self.stdio_stream, self.stderr_stream]
)
))
self.proc._async_reap(self, self._router)
def _start_timer(self):
self.timer = self._router.broker.timers.schedule(
when=self.options.connect_deadline,
func=self._on_timer_expired,
)
if self._reaper:
return
def _on_timer_expired(self):
self._fail_connection(
mitogen.core.TimeoutError(
'Failed to setup connection after %.2f seconds',
self.options.connect_timeout,
)
self._reaper = Reaper(
broker=self._router.broker,
proc=self.proc,
kill=not (
(self.detached and self.child_is_immediate_subprocess) or
# Avoid killing so child has chance to write cProfile data
self._router.profiling
),
# Don't delay shutdown waiting for a detached child, since the
# detached child may expect to live indefinitely after its parent
# exited.
wait_on_shutdown=(not self.detached),
)
self._reaper.reap()
def _on_broker_shutdown(self):
"""
Respond to broker.shutdown() being called by failing the connection
attempt.
"""
self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG))
def stream_factory(self):
return self.stream_protocol_class.build_stream(
@ -1534,8 +1657,36 @@ class Connection(object):
self._router.broker.start_receive(stream)
return stream
def _on_timer_expired(self):
self._fail_connection(
mitogen.core.TimeoutError(
'Failed to setup connection after %.2f seconds',
self.options.connect_timeout,
)
)
def _async_connect(self):
self._start_timer()
LOG.debug('creating connection to context %d using %s',
self.context.context_id, self.__class__.__module__)
mitogen.core.listen(self._router.broker, 'shutdown',
self._on_broker_shutdown)
self._timer = self._router.broker.timers.schedule(
when=self.options.connect_deadline,
func=self._on_timer_expired,
)
try:
self.proc = self.start_child()
except Exception:
self._fail_connection(sys.exc_info()[1])
return
LOG.debug('child for %r started: pid:%r stdin:%r stdout:%r stderr:%r',
self, self.proc.pid,
self.proc.stdin.fileno(),
self.proc.stdout.fileno(),
self.proc.stderr and self.proc.stderr.fileno())
self.stdio_stream = self._setup_stdio_stream()
if self.context.name is None:
self.context.name = self.stdio_stream.name
@ -1544,15 +1695,7 @@ class Connection(object):
self.stderr_stream = self._setup_stderr_stream()
def connect(self, context):
LOG.debug('%r.connect()', self)
self.context = context
self.proc = self.start_child()
LOG.debug('%r.connect(): pid:%r stdin:%r stdout:%r stderr:%r',
self, self.proc.pid,
self.proc.stdin.fileno(),
self.proc.stdout.fileno(),
self.proc.stderr and self.proc.stderr.fileno())
self.latch = mitogen.core.Latch()
self._router.broker.defer(self._async_connect)
self.latch.get()
@ -1561,12 +1704,28 @@ class Connection(object):
class ChildIdAllocator(object):
"""
Allocate new context IDs from a block of unique context IDs allocated by
the master process.
"""
def __init__(self, router):
self.router = router
self.lock = threading.Lock()
self.it = iter(xrange(0))
def allocate(self):
"""
Allocate an ID, requesting a fresh block from the master if the
existing block is exhausted.
:returns:
The new context ID.
.. warning::
This method is not safe to call from the :class:`Broker` thread, as
it may block on IO of its own.
"""
self.lock.acquire()
try:
for id_ in self.it:
@ -1733,7 +1892,9 @@ class CallChain(object):
pipelining is disabled, the exception will be logged to the target
context's logging framework.
"""
LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
LOG.debug('starting no-reply function call to %r: %r',
self.context.name or self.context.context_id,
CallSpec(fn, args, kwargs))
self.context.send(self.make_msg(fn, *args, **kwargs))
def call_async(self, fn, *args, **kwargs):
@ -1789,7 +1950,9 @@ class CallChain(object):
contexts and consumed as they complete using
:class:`mitogen.select.Select`.
"""
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
LOG.debug('starting function call to %s: %r',
self.context.name or self.context.context_id,
CallSpec(fn, args, kwargs))
return self.context.send_async(self.make_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs):
@ -1911,15 +2074,16 @@ class RouteMonitor(object):
RouteMonitor lives entirely on the broker thread, so its data requires no
locking.
:param Router router:
:param mitogen.master.Router router:
Router to install handlers on.
:param Context parent:
:param mitogen.core.Context parent:
:data:`None` in the master process, or reference to the parent context
we should propagate route updates towards.
"""
def __init__(self, router, parent=None):
self.router = router
self.parent = parent
self._log = logging.getLogger('mitogen.route_monitor')
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
@ -2008,8 +2172,8 @@ class RouteMonitor(object):
def notice_stream(self, stream):
"""
When this parent is responsible for a new directly connected child
stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects.
stream, we're also responsible for broadcasting
:data:`mitogen.core.DEL_ROUTE` upstream when that child disconnects.
"""
self._routes_by_stream[stream] = set([stream.protocol.remote_id])
self._propagate_up(mitogen.core.ADD_ROUTE, stream.protocol.remote_id,
@ -2040,8 +2204,8 @@ class RouteMonitor(object):
if routes is None:
return
LOG.debug('%r: %r is gone; propagating DEL_ROUTE for %r',
self, stream, routes)
self._log.debug('stream %s is gone; propagating DEL_ROUTE for %r',
stream.name, routes)
for target_id in routes:
self.router.del_route(target_id)
self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
@ -2067,12 +2231,12 @@ class RouteMonitor(object):
stream = self.router.stream_by_id(msg.auth_id)
current = self.router.stream_by_id(target_id)
if current and current.protocol.remote_id != mitogen.parent_id:
LOG.error('Cannot add duplicate route to %r via %r, '
'already have existing route via %r',
target_id, stream, current)
self._log.error('Cannot add duplicate route to %r via %r, '
'already have existing route via %r',
target_id, stream, current)
return
LOG.debug('Adding route to %d via %r', target_id, stream)
self._log.debug('Adding route to %d via %r', target_id, stream)
self._routes_by_stream[stream].add(target_id)
self.router.add_route(target_id, stream)
self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name)
@ -2094,16 +2258,16 @@ class RouteMonitor(object):
stream = self.router.stream_by_id(msg.auth_id)
if registered_stream != stream:
LOG.error('%r: received DEL_ROUTE for %d from %r, expected %r',
self, target_id, stream, registered_stream)
self._log.error('received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream)
return
context = self.router.context_by_id(target_id, create=False)
if context:
LOG.debug('%r: firing local disconnect for %r', self, context)
self._log.debug('firing local disconnect signal for %r', context)
mitogen.core.fire(context, 'disconnect')
LOG.debug('%r: deleting route to %d via %r', self, target_id, stream)
self._log.debug('deleting route to %d via %r', target_id, stream)
routes = self._routes_by_stream.get(stream)
if routes:
routes.discard(target_id)
@ -2125,7 +2289,7 @@ class Router(mitogen.core.Router):
route_monitor = None
def upgrade(self, importer, parent):
LOG.debug('%r.upgrade()', self)
LOG.debug('upgrading %r with capabilities to start new children', self)
self.id_allocator = ChildIdAllocator(router=self)
self.responder = ModuleForwarder(
router=self,
@ -2152,7 +2316,8 @@ class Router(mitogen.core.Router):
def get_streams(self):
"""
Return a snapshot of all streams in existence at time of call.
Return an atomic snapshot of all streams in existence at time of call.
This is safe to call from any thread.
"""
self._write_lock.acquire()
try:
@ -2179,13 +2344,21 @@ class Router(mitogen.core.Router):
def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
the directly connected stream for `via_id`. This method is called
automatically in response to :data:`mitogen.core.ADD_ROUTE` messages,
but remains public while the design has not yet settled, and situations
may arise where routing is not fully automatic.
Arrange for messages whose `dst_id` is `target_id` to be forwarded on a
directly connected :class:`Stream`. Safe to call from any thread.
This is called automatically by :class:`RouteMonitor` in response to
:data:`mitogen.core.ADD_ROUTE` messages, but remains public while the
design has not yet settled, and situations may arise where routing is
not fully automatic.
:param int target_id:
Target context ID to add a route for.
:param mitogen.core.Stream stream:
Stream over which messages to the target should be routed.
"""
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
LOG.debug('%r: adding route to context %r via %r',
self, target_id, stream)
assert isinstance(target_id, int)
assert isinstance(stream, mitogen.core.Stream)
@ -2196,6 +2369,19 @@ class Router(mitogen.core.Router):
self._write_lock.release()
def del_route(self, target_id):
"""
Delete any route that exists for `target_id`. It is not an error to
delete a route that does not currently exist. Safe to call from any
thread.
This is called automatically by :class:`RouteMonitor` in response to
:data:`mitogen.core.DEL_ROUTE` messages, but remains public while the
design has not yet settled, and situations may arise where routing is
not fully automatic.
:param int target_id:
Target context ID to delete route for.
"""
LOG.debug('%r: deleting route to %r', self, target_id)
# DEL_ROUTE may be sent by a parent if it knows this context sent
# messages to a peer that has now disconnected, to let us raise
@ -2314,82 +2500,175 @@ class Router(mitogen.core.Router):
return self.connect(u'ssh', **kwargs)
class Process(object):
_delays = [0.05, 0.15, 0.3, 1.0, 5.0, 10.0]
name = None
class Reaper(object):
"""
Asynchronous logic for reaping :class:`Process` objects. This is necessary
to prevent uncontrolled buildup of zombie processes in long-lived parents
that will eventually reach an OS limit, preventing creation of new threads
and processes, and to log the exit status of the child in the case of an
error.
def __init__(self, pid, stdin, stdout, stderr=None):
self.pid = pid
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self._returncode = None
self._reap_count = 0
To avoid modifying process-global state such as with
:func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler
that might interfere with the user's ability to use those facilities,
Reaper polls for exit with backoff using timers installed on an associated
:class:`Broker`.
def __repr__(self):
return '%s %s pid %d' % (
type(self).__name__,
self.name,
self.pid,
)
:param mitogen.core.Broker broker:
The :class:`Broker` on which to install timers
:param Process proc:
The process to reap.
:param bool kill:
If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
:param bool wait_on_shutdown:
If :data:`True`, delay :class:`Broker` shutdown if child has not yet
exited. If :data:`False` simply forget the child.
"""
#: :class:`Timer` that invokes :meth:`reap` after some polling delay.
_timer = None
def poll(self):
raise NotImplementedError()
def __init__(self, broker, proc, kill, wait_on_shutdown):
self.broker = broker
self.proc = proc
self.kill = kill
self.wait_on_shutdown = wait_on_shutdown
self._tries = 0
def _signal_child(self, signum):
# For processes like sudo we cannot actually send sudo a signal,
# because it is setuid, so this is best-effort only.
LOG.debug('%r: child process still alive, sending %s',
self, SIGNAL_BY_NUM[signum])
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
try:
os.kill(self.pid, signum)
os.kill(self.proc.pid, signum)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
raise
def _async_reap(self, conn, router):
def _calc_delay(self, count):
"""
Reap the child process during disconnection.
Calculate a poll delay given `count` attempts have already been made.
These constants have no principle, they just produce rapid but still
relatively conservative retries.
"""
if self._returncode is not None:
# on_disconnect() may be invoked more than once, for example, if
# there is still a pending message to be sent after the first
# on_disconnect() call.
return
delay = 0.05
for _ in xrange(count):
delay *= 1.72
return delay
if conn.detached and conn.child_is_immediate_subprocess:
LOG.debug('%r: immediate child is detached, won\'t reap it', self)
return
def _on_broker_shutdown(self):
"""
Respond to :class:`Broker` shutdown by cancelling the reap timer if
:attr:`Router.await_children_at_shutdown` is disabled. Otherwise
shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for
subprocesses may have no intention of exiting any time soon.
"""
if not self.wait_on_shutdown:
self._timer.cancel()
if router.profiling:
LOG.info('%r: wont kill child because profiling=True', self)
return
def _install_timer(self, delay):
new = self._timer is None
self._timer = self.broker.timers.schedule(
when=time.time() + delay,
func=self.reap,
)
if new:
mitogen.core.listen(self.broker, 'shutdown',
self._on_broker_shutdown)
def _remove_timer(self):
if self._timer and self._timer.active:
self._timer.cancel()
mitogen.core.unlisten(self.broker, 'shutdown',
self._on_broker_shutdown)
self._reap_count += 1
status = self.poll()
def reap(self):
"""
Reap the child process during disconnection.
"""
status = self.proc.poll()
if status is not None:
LOG.debug('%r: %s', self, returncode_to_str(status))
LOG.debug('%r: %s', self.proc, returncode_to_str(status))
self._remove_timer()
return
i = self._reap_count - 1
if i >= len(self._delays):
LOG.warning('%r: child will not die, abandoning it', self)
self._tries += 1
if self._tries > 20:
LOG.warning('%r: child will not exit, giving up', self)
self._remove_timer()
return
elif i == 0:
delay = self._calc_delay(self._tries - 1)
LOG.debug('%r still running after IO disconnect, recheck in %.03fs',
self.proc, delay)
self._install_timer(delay)
if not self.kill:
pass
elif self._tries == 1:
self._signal_child(signal.SIGTERM)
elif i == 1:
elif self._tries == 5: # roughly 4 seconds
self._signal_child(signal.SIGKILL)
router.broker.timers.schedule(
when=time.time() + self._delays[i],
func=lambda: self._async_reap(conn, router),
class Process(object):
"""
Process objects provide a uniform interface to the :mod:`subprocess` and
:mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and
:class:`mitogen.fork.Process`.
:param int pid:
The process ID.
:param file stdin:
File object attached to standard input.
:param file stdout:
File object attached to standard output.
:param file stderr:
File object attached to standard error, or :data:`None`.
"""
#: Name of the process used in logs. Set to the stream/context name by
#: :class:`Connection`.
name = None
def __init__(self, pid, stdin, stdout, stderr=None):
#: The process ID.
self.pid = pid
#: File object attached to standard input.
self.stdin = stdin
#: File object attached to standard output.
self.stdout = stdout
#: File object attached to standard error.
self.stderr = stderr
def __repr__(self):
return '%s %s pid %d' % (
type(self).__name__,
self.name,
self.pid,
)
def poll(self):
"""
Fetch the child process exit status, or :data:`None` if it is still
running. This should be overridden by subclasses.
:returns:
Exit status in the style of the :attr:`subprocess.Popen.returncode`
attribute, i.e. with signals represented by a negative integer.
"""
raise NotImplementedError()
class PopenProcess(Process):
"""
:class:`Process` subclass wrapping a :class:`subprocess.Popen` object.
:param subprocess.Popen proc:
The subprocess.
"""
def __init__(self, proc, stdin, stdout, stderr=None):
super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr)
#: The subprocess.
self.proc = proc
def poll(self):
@ -2398,8 +2677,9 @@ class PopenProcess(Process):
class ModuleForwarder(object):
"""
Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache.
Respond to :data:`mitogen.core.GET_MODULE` requests in a child by
forwarding the request to our parent context, or satisfying the request
from our local Importer cache.
"""
def __init__(self, router, parent_context, importer):
self.router = router
@ -2454,7 +2734,7 @@ class ModuleForwarder(object):
return
fullname = msg.data.decode('utf-8')
LOG.debug('%r: %s requested by %d', self, fullname, msg.src_id)
LOG.debug('%r: %s requested by context %d', self, fullname, msg.src_id)
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)

@ -57,9 +57,7 @@ class Select(object):
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
removed, this makes it convenient to respond to calls made in parallel::
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
@ -98,7 +96,7 @@ class Select(object):
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
:class:`Select` may be used to mix inter-thread and inter-process IO:
:class:`Select` may be used to mix inter-thread and inter-process IO::
latch = mitogen.core.Latch()
start_thread(latch)
@ -226,8 +224,15 @@ class Select(object):
raise Error(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
# After installing the notify function, _put() will potentially begin
# receiving calls from other threads immediately, but not for items
# they already had buffered. For those we call _put(), possibly
# duplicating the effect of other _put() being made concurrently, such
# that the Select ends up with more items in its buffer than exist in
# the underlying receivers. We handle the possibility of receivers
# marked notified yet empty inside Select.get(), so this should be
# robust.
for _ in range(recv.size()):
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
@ -261,18 +266,26 @@ class Select(object):
self.remove(recv)
self._latch.close()
def empty(self):
def size(self):
"""
Return the number of items currently buffered.
As with :class:`Queue.Queue`, `0` may be returned even though a
subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`size` and :meth:`get`.
As with :class:`Queue.Queue`, `>0` may be returned even though a
subsequent call to :meth:`get` will block, since another waiting thread
may be woken at any moment between :meth:`size` and :meth:`get`.
"""
Return :data:`True` if calling :meth:`get` would block.
return sum(recv.size() for recv in self._receivers)
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
def empty(self):
"""
Return `size() == 0`.
:meth:`empty` may return :data:`False` even when :meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
.. deprecated:: 0.2.8
Use :meth:`size` instead.
"""
return self._latch.empty()
@ -329,5 +342,6 @@ class Select(object):
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
# self._put(), or because Select.add() caused duplicate _put()
# calls. In this case simply retry.
continue

@ -86,8 +86,13 @@ def get_or_create_pool(size=None, router=None):
_pool_lock.acquire()
try:
if _pool_pid != my_pid:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE,
overwrite=True)
_pool = Pool(
router,
services=[],
size=size or DEFAULT_POOL_SIZE,
overwrite=True,
recv=mitogen.core.Dispatcher._service_recv,
)
# In case of Broker shutdown crash, Pool can cause 'zombie'
# processes.
mitogen.core.listen(router.broker, 'shutdown',
@ -99,6 +104,10 @@ def get_or_create_pool(size=None, router=None):
return _pool
def get_thread_name():
return threading.currentThread().getName()
def call(service_name, method_name, call_context=None, **kwargs):
"""
Call a service registered with this pool, using the calling thread as a
@ -471,13 +480,19 @@ class Pool(object):
program's configuration or its input data.
:param mitogen.core.Router router:
Router to listen for ``CALL_SERVICE`` messages on.
:class:`mitogen.core.Router` to listen for
:data:`mitogen.core.CALL_SERVICE` messages.
:param list services:
Initial list of services to register.
:param mitogen.core.Receiver recv:
:data:`mitogen.core.CALL_SERVICE` receiver to reuse. This is used by
:func:`get_or_create_pool` to hand off a queue of messages from the
Dispatcher stub handler while avoiding a race.
"""
activator_class = Activator
def __init__(self, router, services=(), size=1, overwrite=False):
def __init__(self, router, services=(), size=1, overwrite=False,
recv=None):
self.router = router
self._activator = self.activator_class()
self._ipc_latch = mitogen.core.Latch()
@ -498,6 +513,16 @@ class Pool(object):
}
self._invoker_by_name = {}
if recv is not None:
# When inheriting from mitogen.core.Dispatcher, we must remove its
# stub notification function before adding it to our Select. We
# always overwrite this receiver since the standard service.Pool
# handler policy differs from the one inherited from
# core.Dispatcher.
recv.notify = None
self._select.add(recv)
self._func_by_source[recv] = self._on_service_call
for service in services:
self.add(service)
self._py_24_25_compat()
@ -611,10 +636,11 @@ class Pool(object):
try:
event = self._select.get_event()
except mitogen.core.LatchError:
LOG.debug('%r: graceful exit', self)
LOG.debug('thread %s exiting gracefully', get_thread_name())
return
except mitogen.core.ChannelError:
LOG.debug('%r: exitting: %s', self, sys.exc_info()[1])
LOG.debug('thread %s exiting with error: %s',
get_thread_name(), sys.exc_info()[1])
return
func = self._func_by_source[event.source]
@ -627,16 +653,14 @@ class Pool(object):
try:
self._worker_run()
except Exception:
th = threading.currentThread()
LOG.exception('%r: worker %r crashed', self, th.getName())
LOG.exception('%r: worker %r crashed', self, get_thread_name())
raise
def __repr__(self):
th = threading.currentThread()
return 'Pool(%04x, size=%d, th=%r)' % (
id(self) & 0xffff,
len(self._threads),
th.getName(),
get_thread_name(),
)
@ -688,10 +712,12 @@ class PushFileService(Service):
def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.protocol.remote_id)
child = self.router.context_by_id(stream.protocol.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent:
if child.context_id != context.context_id:
LOG.debug('requesting %s forward small file to %s: %s',
child, context, path)
child.call_service_async(
service_name=self.name(),
method_name='forward',
@ -699,6 +725,8 @@ class PushFileService(Service):
context=context
).close()
else:
LOG.debug('requesting %s cache and forward small file to %s: %s',
child, context, path)
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
@ -729,8 +757,8 @@ class PushFileService(Service):
'path': mitogen.core.FsPathTypes,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
if path not in self._cache:
LOG.debug('caching small file %s', path)
fp = open(path, 'rb')
try:
self._cache[path] = mitogen.core.Blob(fp.read())
@ -748,7 +776,7 @@ class PushFileService(Service):
def store_and_forward(self, path, data, context):
LOG.debug('%r.store_and_forward(%r, %r, %r) %r',
self, path, data, context,
threading.currentThread().getName())
get_thread_name())
self._lock.acquire()
try:
self._cache[path] = data

@ -49,7 +49,7 @@ except NameError:
from mitogen.core import any
LOG = logging.getLogger('mitogen')
LOG = logging.getLogger(__name__)
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'

@ -81,7 +81,7 @@ class SetupBootstrapProtocol(mitogen.parent.BootstrapProtocol):
def _on_password_prompt(self, line, match):
LOG.debug('%r: (password prompt): %r',
self.stream.name, line.decode('utf-8', 'replace'))
self.stream.name, line.decode('utf-8', 'replace'))
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(

@ -244,6 +244,9 @@ class Connection(mitogen.parent.Connection):
diag_protocol_class = SetupProtocol
options_class = Options
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
create_child_args = {
'escalates_privilege': True,
}
child_is_immediate_subprocess = False
def _get_name(self):

@ -36,6 +36,7 @@ have the same privilege (auth_id) as the current process.
"""
import errno
import logging
import os
import socket
import struct
@ -45,7 +46,8 @@ import tempfile
import mitogen.core
import mitogen.master
from mitogen.core import LOG
LOG = logging.getLogger(__name__)
class Error(mitogen.core.Error):
@ -143,8 +145,8 @@ class Listener(mitogen.core.Protocol):
try:
pid, = struct.unpack('>L', sock.recv(4))
except (struct.error, socket.error):
LOG.error('%r: failed to read remote identity: %s',
self, sys.exc_info()[1])
LOG.error('listener: failed to read remote identity: %s',
sys.exc_info()[1])
return
context_id = self._router.id_allocator.allocate()
@ -152,8 +154,8 @@ class Listener(mitogen.core.Protocol):
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('%r: failed to assign identity to PID %d: %s',
self, pid, sys.exc_info()[1])
LOG.error('listener: failed to assign identity to PID %d: %s',
pid, sys.exc_info()[1])
return
context = mitogen.parent.Context(self._router, context_id)
@ -165,7 +167,8 @@ class Listener(mitogen.core.Protocol):
stream.protocol.auth_id = mitogen.context_id
stream.protocol.is_privileged = True
stream.accept(sock, sock)
LOG.debug('%r: accepted %r', self, stream)
LOG.debug('listener: accepted connection from PID %d: %s',
pid, stream.name)
self._router.register(context, stream)
@ -186,7 +189,7 @@ def _connect(path, broker, sock):
mitogen.parent_id = remote_id
mitogen.parent_ids = [remote_id]
LOG.debug('unix.connect(): local ID is %r, remote is %r',
LOG.debug('client: local ID is %r, remote is %r',
mitogen.context_id, remote_id)
router = mitogen.master.Router(broker=broker)
@ -204,7 +207,7 @@ def _connect(path, broker, sock):
def connect(path, broker=None):
LOG.debug('unix.connect(path=%r)', path)
LOG.debug('client: connecting to %s', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
return _connect(path, broker, sock)

@ -39,7 +39,6 @@ import mitogen.master
import mitogen.parent
LOG = logging.getLogger('mitogen')
iteritems = getattr(dict, 'iteritems', dict.items)
if mitogen.core.PY3:

@ -197,7 +197,7 @@ class LinuxPolicyTest(testlib.TestCase):
tf = tempfile.NamedTemporaryFile()
try:
before = self._get_cpus()
self.policy._set_cpu(3)
self.policy._set_cpu(None, 3)
my_cpu = self._get_cpus()
proc = mitogen.parent.popen(

@ -0,0 +1,77 @@
import os
import signal
import sys
import tempfile
import threading
import time
import unittest2
import testlib
import mitogen.core
import mitogen.parent
class ConnectionTest(testlib.RouterMixin, testlib.TestCase):
def test_broker_shutdown_while_connect_in_progress(self):
# if Broker.shutdown() is called while a connection attempt is in
# progress, the connection should be torn down.
path = tempfile.mktemp(prefix='broker_shutdown_sem_')
open(path, 'wb').close()
os.environ['BROKER_SHUTDOWN_SEMAPHORE'] = path
result = []
def thread():
python_path = testlib.data_path('broker_shutdown_test_python.py')
try:
result.append(self.router.local(python_path=python_path))
except Exception:
result.append(sys.exc_info()[1])
th = threading.Thread(target=thread)
th.start()
while os.path.exists(path):
time.sleep(0.05)
self.broker.shutdown()
th.join()
exc, = result
self.assertTrue(isinstance(exc, mitogen.parent.CancelledError))
self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0])
@mitogen.core.takes_econtext
def do_detach(econtext):
econtext.detach()
while 1:
time.sleep(1)
logging.getLogger('mitogen').error('hi')
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
def test_subprocess_preserved_on_shutdown(self):
c1 = self.router.local()
pid = c1.call(os.getpid)
l = mitogen.core.Latch()
mitogen.core.listen(c1, 'disconnect', l.put)
c1.call_no_reply(do_detach)
l.get()
self.broker.shutdown()
self.broker.join()
os.kill(pid, 0) # succeeds if process still alive
# now clean up
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)
if __name__ == '__main__':
unittest2.main()

@ -173,6 +173,33 @@ class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
class TtyCreateChildTest(testlib.TestCase):
func = staticmethod(mitogen.parent.tty_create_child)
def test_dev_tty_open_succeeds(self):
# In the early days of UNIX, a process that lacked a controlling TTY
# would acquire one simply by opening an existing TTY. Linux and OS X
# continue to follow this behaviour, however at least FreeBSD moved to
# requiring an explicit ioctl(). Linux supports it, but we don't yet
# use it there and anyway the behaviour will never change, so no point
# in fixing things that aren't broken. Below we test that
# getpass-loving apps like sudo and ssh get our slave PTY when they
# attempt to open /dev/tty, which is what they both do on attempting to
# read a password.
tf = tempfile.NamedTemporaryFile()
try:
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.stdin.fileno())
# read(3) below due to https://bugs.python.org/issue37696
self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3))
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read())
proc.stdout.close()
finally:
tf.close()
def test_stdin(self):
proc, info, _ = run_fd_check(self.func, 0, 'read',
lambda proc: proc.stdin.write(b('TEST')))

@ -0,0 +1,9 @@
#!/usr/bin/env python
# Delete a semaphore file to allow the main thread to wake up, then sleep for
# 30 seconds before starting the real Python.
import os
import time
import sys
os.unlink(os.environ['BROKER_SHUTDOWN_SEMAPHORE'])
time.sleep(30)
os.execl(sys.executable, sys.executable, *sys.argv[1:])

@ -211,37 +211,6 @@ class OpenPtyTest(testlib.TestCase):
slave_fp.close()
class TtyCreateChildTest(testlib.TestCase):
func = staticmethod(mitogen.parent.tty_create_child)
def test_dev_tty_open_succeeds(self):
# In the early days of UNIX, a process that lacked a controlling TTY
# would acquire one simply by opening an existing TTY. Linux and OS X
# continue to follow this behaviour, however at least FreeBSD moved to
# requiring an explicit ioctl(). Linux supports it, but we don't yet
# use it there and anyway the behaviour will never change, so no point
# in fixing things that aren't broken. Below we test that
# getpass-loving apps like sudo and ssh get our slave PTY when they
# attempt to open /dev/tty, which is what they both do on attempting to
# read a password.
tf = tempfile.NamedTemporaryFile()
try:
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.stdin.fileno())
# read(3) below due to https://bugs.python.org/issue37696
self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3))
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read())
proc.stdout.close()
finally:
tf.close()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is

@ -358,6 +358,18 @@ class GetReceiverTest(testlib.RouterMixin, testlib.TestCase):
msg = select.get()
self.assertEquals('123', msg.unpickle())
def test_nonempty_multiple_items_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
recv._on_receive(mitogen.core.Message.pickled('234'))
select = self.klass([recv], oneshot=False)
msg = select.get()
self.assertEquals('123', msg.unpickle())
msg = select.get()
self.assertEquals('234', msg.unpickle())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))
def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
@ -415,6 +427,16 @@ class GetLatchTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([latch])
self.assertEquals(123, select.get())
def test_nonempty_multiple_items_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
latch.put(234)
select = self.klass([latch], oneshot=False)
self.assertEquals(123, select.get())
self.assertEquals(234, select.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))
def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])

@ -192,7 +192,7 @@ def sync_with_broker(broker, timeout=10.0):
"""
sem = mitogen.core.Latch()
broker.defer(sem.put, None)
sem.get(timeout=10.0)
sem.get(timeout=timeout)
def log_fd_calls():
@ -338,7 +338,7 @@ class TestCase(unittest2.TestCase):
def _teardown_check_fds(self):
mitogen.core.Latch._on_fork()
if get_fd_count() != self._fd_count_before:
import os; os.system('lsof -w -p %s' % (os.getpid(),))
import os; os.system('lsof +E -w -p %s' % (os.getpid(),))
assert 0, "%s leaked FDs. Count before: %s, after: %s" % (
self, self._fd_count_before, get_fd_count(),
)
@ -428,7 +428,7 @@ class DockerizedSshDaemon(object):
def start_container(self):
try:
subprocess__check_output(['docker'])
subprocess__check_output(['docker', '--version'])
except Exception:
raise unittest2.SkipTest('Docker binary is unavailable')

@ -98,23 +98,31 @@ class ScheduleTest(TimerListMixin, testlib.TestCase):
class ExpireTest(TimerListMixin, testlib.TestCase):
def test_in_past(self):
timer = self.list.schedule(29, mock.Mock())
self.assertTrue(timer.active)
self.list._now = lambda: 30
self.list.expire()
self.assertEquals(1, len(timer.func.mock_calls))
self.assertFalse(timer.active)
def test_in_future(self):
timer = self.list.schedule(29, mock.Mock())
self.assertTrue(timer.active)
self.list._now = lambda: 28
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
self.assertTrue(timer.active)
def test_same_moment(self):
timer = self.list.schedule(29, mock.Mock())
timer2 = self.list.schedule(29, mock.Mock())
self.assertTrue(timer.active)
self.assertTrue(timer2.active)
self.list._now = lambda: 29
self.list.expire()
self.assertEquals(1, len(timer.func.mock_calls))
self.assertEquals(1, len(timer2.func.mock_calls))
self.assertFalse(timer.active)
self.assertFalse(timer2.active)
def test_cancelled(self):
self.list._now = lambda: 29
@ -131,7 +139,9 @@ class CancelTest(TimerListMixin, testlib.TestCase):
def test_single_cancel(self):
self.list._now = lambda: 29
timer = self.list.schedule(29, mock.Mock())
self.assertTrue(timer.active)
timer.cancel()
self.assertFalse(timer.active)
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
@ -139,7 +149,9 @@ class CancelTest(TimerListMixin, testlib.TestCase):
self.list._now = lambda: 29
timer = self.list.schedule(29, mock.Mock())
timer.cancel()
self.assertFalse(timer.active)
timer.cancel()
self.assertFalse(timer.active)
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))

Loading…
Cancel
Save