Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #605: update Changelog.
  issue #605: ansible: share a sem_t instead of a pthread_mutex_t
  issue #613: add tests for all the weird shutdown methods
  Add mitogen.core.now() and use it everywhere; closes #614.
  docs: move decorator docs into core.py and use autodecorator
  preamble_size: make it work on Python 3.
  docs: upgrade Sphinx to 2.1.2, require Python 3 to build docs.
  docs: fix Sphinx warnings, add LogHandler, more docstrings
  docs: tidy up some Changelog text
pull/618/head
David Wilson 5 years ago
commit 74834c845f

@ -92,37 +92,37 @@ try:
_libc = ctypes.CDLL(None, use_errno=True) _libc = ctypes.CDLL(None, use_errno=True)
_strerror = _libc.strerror _strerror = _libc.strerror
_strerror.restype = ctypes.c_char_p _strerror.restype = ctypes.c_char_p
_pthread_mutex_init = _libc.pthread_mutex_init _sem_init = _libc.sem_init
_pthread_mutex_lock = _libc.pthread_mutex_lock _sem_wait = _libc.sem_wait
_pthread_mutex_unlock = _libc.pthread_mutex_unlock _sem_post = _libc.sem_post
_sched_setaffinity = _libc.sched_setaffinity _sched_setaffinity = _libc.sched_setaffinity
except (OSError, AttributeError): except (OSError, AttributeError):
_libc = None _libc = None
_strerror = None _strerror = None
_pthread_mutex_init = None _sem_init = None
_pthread_mutex_lock = None _sem_wait = None
_pthread_mutex_unlock = None _sem_post = None
_sched_setaffinity = None _sched_setaffinity = None
class pthread_mutex_t(ctypes.Structure): class sem_t(ctypes.Structure):
""" """
Wrap pthread_mutex_t to allow storing a lock in shared memory. Wrap sem_t to allow storing a lock in shared memory.
""" """
_fields_ = [ _fields_ = [
('data', ctypes.c_uint8 * 512), ('data', ctypes.c_uint8 * 128),
] ]
def init(self): def init(self):
if _pthread_mutex_init(self.data, 0): if _sem_init(self.data, 1, 1):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
def acquire(self): def acquire(self):
if _pthread_mutex_lock(self.data): if _sem_wait(self.data):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
def release(self): def release(self):
if _pthread_mutex_unlock(self.data): if _sem_post(self.data):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
@ -133,7 +133,7 @@ class State(ctypes.Structure):
the context of the new child process. the context of the new child process.
""" """
_fields_ = [ _fields_ = [
('lock', pthread_mutex_t), ('lock', sem_t),
('counter', ctypes.c_uint8), ('counter', ctypes.c_uint8),
] ]

@ -2,11 +2,6 @@
API Reference API Reference
************* *************
.. toctree::
:hidden:
signals
Package Layout Package Layout
============== ==============
@ -31,29 +26,10 @@ mitogen.core
.. automodule:: mitogen.core .. automodule:: mitogen.core
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. decorator:: takes_econtext .. autodecorator:: takes_econtext
Decorator that marks a function or class method to automatically receive a
kwarg named `econtext`, referencing the
:class:`mitogen.core.ExternalContext` active in the context in which the
function is being invoked in. The decorator is only meaningful when the
function is invoked via :data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `econtext` must still be passed to
it explicitly.
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. decorator:: takes_router .. autodecorator:: takes_router
Decorator that marks a function or class method to automatically receive a
kwarg named `router`, referencing the :class:`mitogen.core.Router`
active in the context in which the function is being invoked in. The
decorator is only meaningful when the function is invoked via
:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `router` must still be passed to it
explicitly.
mitogen.master mitogen.master
@ -627,6 +603,14 @@ Fork Safety
Utility Functions Utility Functions
================= =================
.. currentmodule:: mitogen.core
.. function:: now
A reference to :func:`time.time` on Python 2, or :func:`time.monotonic` on
Python >3.3. We prefer :func:`time.monotonic` when available to ensure
timers are not impacted by system clock changes.
.. module:: mitogen.utils .. module:: mitogen.utils
A random assortment of utility functions useful on masters and children. A random assortment of utility functions useful on masters and children.

@ -22,7 +22,7 @@ To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub <https://github.com/dw/mitogen/>`_. `directly from GitHub <https://github.com/dw/mitogen/>`_.
Enhancements Enhancements
^^^^^^^^^^^^ ~~~~~~~~~~~~
* `#556 <https://github.com/dw/mitogen/issues/556>`_, * `#556 <https://github.com/dw/mitogen/issues/556>`_,
`#587 <https://github.com/dw/mitogen/issues/587>`_: Ansible 2.8 is partially `#587 <https://github.com/dw/mitogen/issues/587>`_: Ansible 2.8 is partially
@ -61,7 +61,7 @@ Enhancements
Mitogen for Ansible Mitogen for Ansible
^^^^^^^^^^^^^^^^^^^ ~~~~~~~~~~~~~~~~~~~
* `#363 <https://github.com/dw/mitogen/issues/363>`_: fix an obscure race * `#363 <https://github.com/dw/mitogen/issues/363>`_: fix an obscure race
matching *Permission denied* errors from some versions of ``su`` running on matching *Permission denied* errors from some versions of ``su`` running on
@ -107,20 +107,21 @@ Mitogen for Ansible
server has been increased from `15*3` seconds to `30*10` seconds. server has been increased from `15*3` seconds to `30*10` seconds.
* `#600 <https://github.com/dw/mitogen/issues/600>`_: functionality to reflect * `#600 <https://github.com/dw/mitogen/issues/600>`_: functionality to reflect
changes to ``/etc/environment`` in the running interpreter did not account changes to ``/etc/environment`` did not account for Unicode file contents.
for Unicode file contents. Now the file may contain data in any single byte The file may now use any single byte encoding.
encoding.
* `#602 <https://github.com/dw/mitogen/issues/602>`_: connection configuration * `#602 <https://github.com/dw/mitogen/issues/602>`_: connection configuration
is more accurately inferred for `meta: reset_connection`, the `synchronize` is more accurately inferred for `meta: reset_connection`, the `synchronize`
module, and for any other action plug-ins that establish new connections of module, and for any action plug-ins that establish additional connections.
their own.
* `#605 <https://github.com/dw/mitogen/issues/605>`_: fix a deadlock managing a
shared counter used for load balancing.
* `#615 <https://github.com/dw/mitogen/issues/615>`_: streaming file transfer * `#615 <https://github.com/dw/mitogen/issues/615>`_: streaming file transfer
is implemented for the ``fetch`` and any other action that transfers files is implemented for ``fetch`` and other actions that transfer files from the
from the target to the controller. Previously the file would be sent as a target to the controller. Previously the file was sent in one message,
single message, requiring the file to fit in RAM and be smaller than internal requiring it to fit in RAM and be smaller than the internal message size
limits on the size of a single message. limit.
* `7ae926b3 <https://github.com/dw/mitogen/commit/7ae926b3>`_: the * `7ae926b3 <https://github.com/dw/mitogen/commit/7ae926b3>`_: the
``lineinfile`` module began leaking writable temporary file descriptors since ``lineinfile`` module began leaking writable temporary file descriptors since
@ -188,21 +189,20 @@ Core Library
:meth:`empty` method of :class:`mitogen.core.Latch`, :meth:`empty` method of :class:`mitogen.core.Latch`,
:class:`mitogen.core.Receiver` and :class:`mitogen.select.Select` has been :class:`mitogen.core.Receiver` and :class:`mitogen.select.Select` has been
replaced by a more general :meth:`size` method. :meth:`empty` will be removed replaced by a more general :meth:`size` method. :meth:`empty` will be removed
in Mitogen 0.3 in 0.3
* `ecc570cb <https://github.com/dw/mitogen/commit/ecc570cb>`_: previously * `ecc570cb <https://github.com/dw/mitogen/commit/ecc570cb>`_: previously
:meth:`mitogen.select.Select.add` would enqueue a single wake event when :meth:`mitogen.select.Select.add` would enqueue one wake event when adding an
adding an existing receiver, latch or subselect that contained multiple existing receiver, latch or subselect that contained multiple buffered items,
buffered items, causing future :meth:`get` calls to block or fail even though causing :meth:`get` calls to block or fail even though data existed to return.
data existed that could be returned.
* `5924af15 <https://github.com/dw/mitogen/commit/5924af15>`_: *[security]* the * `5924af15 <https://github.com/dw/mitogen/commit/5924af15>`_: *[security]*
unidirectional routing mode, in which contexts may only communicate with unidirectional routing, where contexts may optionally only communicate with
parents and never siblings (so a program cannot accidentally bridge parents and never siblings (so that air-gapped networks cannot be
air-gapped networks) was not inherited when a child context was initiated unintentionally bridged) was not inherited when a child was initiated
directly from an existing child. This did not effect the Ansible extension, directly from an another child. This did not effect Ansible, since the
since the controller initiates any new context used for routing, only forked controller initiates any new child used for routing, only forked tasks are
tasks are initiated by children. initiated by children.
Thanks! Thanks!
@ -491,7 +491,7 @@ Enhancements
`#491 <https://github.com/dw/mitogen/issues/491>`_, `#491 <https://github.com/dw/mitogen/issues/491>`_,
`#493 <https://github.com/dw/mitogen/issues/493>`_: the interface employed `#493 <https://github.com/dw/mitogen/issues/493>`_: the interface employed
for in-process queues changed from `kqueue for in-process queues changed from `kqueue
<https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2>`_ / `epoll <https://www.freebsd.org/cgi/man.cgi?query=kqueue>`_ / `epoll
<http://man7.org/linux/man-pages/man7/epoll.7.html>`_ to `poll() <http://man7.org/linux/man-pages/man7/epoll.7.html>`_ to `poll()
<http://man7.org/linux/man-pages/man2/poll.2.html>`_, which requires no setup <http://man7.org/linux/man-pages/man2/poll.2.html>`_, which requires no setup
or teardown, yielding a 38% latency reduction for inter-thread communication. or teardown, yielding a 38% latency reduction for inter-thread communication.
@ -1037,7 +1037,7 @@ bug reports, testing, features and fixes in this release contributed by
`Josh Smift <https://github.com/jbscare>`_, `Josh Smift <https://github.com/jbscare>`_,
`Luca Nunzi <https://github.com/0xlc>`_, `Luca Nunzi <https://github.com/0xlc>`_,
`Orion Poplawski <https://github.com/opoplawski>`_, `Orion Poplawski <https://github.com/opoplawski>`_,
`Peter V. Saveliev <https://github.com/svinota>`_, `Peter V. Saveliev <https://github.com/svinota/>`_,
`Pierre-Henry Muller <https://github.com/pierrehenrymuller>`_, `Pierre-Henry Muller <https://github.com/pierrehenrymuller>`_,
`Pierre-Louis Bonicoli <https://github.com/jesteria>`_, `Pierre-Louis Bonicoli <https://github.com/jesteria>`_,
`Prateek Jain <https://github.com/prateekj201>`_, `Prateek Jain <https://github.com/prateekj201>`_,
@ -1095,7 +1095,7 @@ Core Library
* `#300 <https://github.com/dw/mitogen/issues/300>`_: the broker could crash on * `#300 <https://github.com/dw/mitogen/issues/300>`_: the broker could crash on
OS X during shutdown due to scheduled `kqueue OS X during shutdown due to scheduled `kqueue
<https://www.freebsd.org/cgi/man.cgi?query=kevent>`_ filter changes for <https://www.freebsd.org/cgi/man.cgi?query=kqueue>`_ filter changes for
descriptors that were closed before the IO loop resumes. As a temporary descriptors that were closed before the IO loop resumes. As a temporary
workaround, kqueue's bulk change feature is not used. workaround, kqueue's bulk change feature is not used.

@ -7,7 +7,7 @@ VERSION = '%s.%s.%s' % mitogen.__version__
author = u'Network Genomics' author = u'Network Genomics'
copyright = u'2019, Network Genomics' copyright = u'2019, Network Genomics'
exclude_patterns = ['_build'] exclude_patterns = ['_build', '.venv']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput'] extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput']
html_show_copyright = False html_show_copyright = False
html_show_sourcelink = False html_show_sourcelink = False

@ -7,11 +7,6 @@ Internal API Reference
Internal APIs are subject to rapid change even across minor releases. This Internal APIs are subject to rapid change even across minor releases. This
page exists to help users modify and extend the library. page exists to help users modify and extend the library.
.. toctree::
:hidden:
signals
Constants Constants
========= =========
@ -50,6 +45,10 @@ Logging
See also :class:`mitogen.core.IoLoggerProtocol`. See also :class:`mitogen.core.IoLoggerProtocol`.
.. currentmodule:: mitogen.core
.. autoclass:: LogHandler
:members:
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. autoclass:: LogForwarder .. autoclass:: LogForwarder
:members: :members:
@ -270,6 +269,8 @@ Helpers
.. autofunction:: minimize_source .. autofunction:: minimize_source
.. _signals:
Signals Signals
======= =======
@ -312,10 +313,19 @@ These signals are used internally by Mitogen.
- ``disconnect`` - ``disconnect``
- Fired on the Broker thread when disconnection is detected. - Fired on the Broker thread when disconnection is detected.
* - :py:class:`mitogen.core.Stream`
- ``shutdown``
- Fired on the Broker thread when broker shutdown begins.
* - :py:class:`mitogen.core.Context` * - :py:class:`mitogen.core.Context`
- ``disconnect`` - ``disconnect``
- Fired on the Broker thread during shutdown (???) - Fired on the Broker thread during shutdown (???)
* - :py:class:`mitogen.parent.Process`
- ``exit``
- Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully
exitted.
* - :py:class:`mitogen.core.Broker` * - :py:class:`mitogen.core.Broker`
- ``shutdown`` - ``shutdown``
- Fired after Broker.shutdown() is called. - Fired after Broker.shutdown() is called.

@ -1,3 +1,3 @@
Sphinx==1.7.1 Sphinx==2.1.2
sphinxcontrib-programoutput==0.11 sphinxcontrib-programoutput==0.14
alabaster==0.7.10 alabaster==0.7.10

@ -362,6 +362,10 @@ def to_text(o):
return UnicodeType(o) return UnicodeType(o)
# Documented in api.rst to work around Sphinx limitation.
now = getattr(time, 'monotonic', time.time)
# Python 2.4 # Python 2.4
try: try:
any any
@ -448,11 +452,31 @@ def fire(obj, name, *args, **kwargs):
def takes_econtext(func): def takes_econtext(func):
"""
Decorator that marks a function or class method to automatically receive a
kwarg named `econtext`, referencing the
:class:`mitogen.core.ExternalContext` active in the context in which the
function is being invoked in. The decorator is only meaningful when the
function is invoked via :data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `econtext` must still be passed to
it explicitly.
"""
func.mitogen_takes_econtext = True func.mitogen_takes_econtext = True
return func return func
def takes_router(func): def takes_router(func):
"""
Decorator that marks a function or class method to automatically receive a
kwarg named `router`, referencing the :class:`mitogen.core.Router` active
in the context in which the function is being invoked in. The decorator is
only meaningful when the function is invoked via :data:`CALL_FUNCTION
<mitogen.core.CALL_FUNCTION>`.
When the function is invoked directly, `router` must still be passed to it
explicitly.
"""
func.mitogen_takes_router = True func.mitogen_takes_router = True
return func return func
@ -616,7 +640,7 @@ def _real_profile_hook(name, func, *args):
return func(*args) return func(*args)
finally: finally:
path = _profile_fmt % { path = _profile_fmt % {
'now': int(1e6 * time.time()), 'now': int(1e6 * now()),
'identity': name, 'identity': name,
'pid': os.getpid(), 'pid': os.getpid(),
'ext': '%s' 'ext': '%s'
@ -1024,11 +1048,11 @@ class Receiver(object):
routed to the context due to disconnection, and ignores messages that routed to the context due to disconnection, and ignores messages that
did not originate from the respondent context. did not originate from the respondent context.
""" """
#: If not :data:`None`, a reference to a function invoked as #: If not :data:`None`, a function invoked as `notify(receiver)` after a
#: `notify(receiver)` when a new message is delivered to this receiver. The #: message has been received. The function is invoked on :class:`Broker`
#: function is invoked on the broker thread, therefore it must not block. #: thread, therefore it must not block. Used by
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple #: :class:`mitogen.select.Select` to efficiently implement waiting on
#: receivers. #: multiple event sources.
notify = None notify = None
raise_channelerror = True raise_channelerror = True
@ -1513,6 +1537,22 @@ class Importer(object):
class LogHandler(logging.Handler): class LogHandler(logging.Handler):
"""
A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG`
messages to be sent to a parent context in response to logging messages
generated by the current context. This is installed by default in child
contexts during bootstrap, so that :mod:`logging` events can be viewed and
managed centrally in the master process.
The handler is initially *corked* after construction, such that it buffers
messages until :meth:`uncork` is called. This allows logging to be
installed prior to communication with the target being available, and
avoids any possible race where early log messages might be dropped.
:param mitogen.core.Context context:
The context to send log messages towards. At present this is always
the master process.
"""
def __init__(self, context): def __init__(self, context):
logging.Handler.__init__(self) logging.Handler.__init__(self)
self.context = context self.context = context
@ -1549,6 +1589,9 @@ class LogHandler(logging.Handler):
self._buffer_lock.release() self._buffer_lock.release()
def emit(self, rec): def emit(self, rec):
"""
Send a :data:`FORWARD_LOG` message towards the target context.
"""
if rec.name == 'mitogen.io' or \ if rec.name == 'mitogen.io' or \
getattr(self.local, 'in_emit', False): getattr(self.local, 'in_emit', False):
return return
@ -1566,6 +1609,30 @@ class LogHandler(logging.Handler):
class Stream(object): class Stream(object):
"""
A :class:`Stream` is one readable and optionally one writeable file
descriptor (represented by :class:`Side`) aggregated alongside an
associated :class:`Protocol` that knows how to respond to IO readiness
events for those descriptors.
Streams are registered with :class:`Broker`, and callbacks are invoked on
the broker thread in response to IO activity. When registered using
:meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker
may call any of :meth:`on_receive`, :meth:`on_transmit`,
:meth:`on_shutdown` or :meth:`on_disconnect`.
It is expected that the :class:`Protocol` associated with a stream will
change over its life. For example during connection setup, the initial
protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to
enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to
the target, before handing off to :class:`MitogenProtocol` when the target
process is initialized.
Streams connecting to children are in turn aggregated by
:class:`mitogen.parent.Connection`, which contains additional logic for
managing any child process, and a reference to any separate ``stderr``
:class:`Stream` connected to that process.
"""
#: A :class:`Side` representing the stream's receive file descriptor. #: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None receive_side = None
@ -1578,14 +1645,16 @@ class Stream(object):
#: In parents, the :class:`mitogen.parent.Connection` instance. #: In parents, the :class:`mitogen.parent.Connection` instance.
conn = None conn = None
#: The stream name. This is used in the :meth:`__repr__` output in any log
#: messages, it may be any descriptive string.
name = u'default' name = u'default'
def set_protocol(self, protocol): def set_protocol(self, protocol):
""" """
Bind a protocol to this stream, by updating :attr:`Protocol.stream` to Bind a :class:`Protocol` to this stream, by updating
refer to this stream, and updating this stream's :attr:`Protocol.stream` to refer to this stream, and updating this
:attr:`Stream.protocol` to the refer to the protocol. Any prior stream's :attr:`Stream.protocol` to the refer to the protocol. Any
protocol's :attr:`Protocol.stream` is set to :data:`None`. prior protocol's :attr:`Protocol.stream` is set to :data:`None`.
""" """
if self.protocol: if self.protocol:
self.protocol.stream = None self.protocol.stream = None
@ -1593,6 +1662,21 @@ class Stream(object):
self.protocol.stream = self self.protocol.stream = self
def accept(self, rfp, wfp): def accept(self, rfp, wfp):
"""
Attach a pair of file objects to :attr:`receive_side` and
:attr:`transmit_side`, after wrapping them in :class:`Side` instances.
:class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
on the underlying file descriptors during construction.
The same file object may be used for both sides. The default
:meth:`on_disconnect` is handles the possibility that only one
descriptor may need to be closed.
:param file rfp:
The file object to receive from.
:param file wfp:
The file object to transmit to.
"""
self.receive_side = Side(self, rfp) self.receive_side = Side(self, rfp)
self.transmit_side = Side(self, wfp) self.transmit_side = Side(self, wfp)
@ -1601,13 +1685,17 @@ class Stream(object):
def on_receive(self, broker): def on_receive(self, broker):
""" """
Called by :class:`Broker` when the stream's :attr:`receive_side` has Invoked by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading. has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever Subclasses must implement this if they are registered using
called on them, and the method must call :meth:`on_disconect` if :meth:`Broker.start_receive`, and the method must invoke
reading produces an empty string. :meth:`on_disconnect` if reading produces an empty string.
The default implementation reads :attr:`Protocol.read_size` bytes and
passes the resulting bytestring to :meth:`Protocol.on_receive`. If the
bytestring is 0 bytes, invokes :meth:`on_disconnect` instead.
""" """
buf = self.receive_side.read(self.protocol.read_size) buf = self.receive_side.read(self.protocol.read_size)
if not buf: if not buf:
@ -1618,30 +1706,39 @@ class Stream(object):
def on_transmit(self, broker): def on_transmit(self, broker):
""" """
Called by :class:`Broker` when the stream's :attr:`transmit_side` Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has
has been marked writeable using :meth:`Broker._start_transmit` and been marked writeable using :meth:`Broker._start_transmit` and the
the broker has detected the associated file descriptor is ready for broker has detected the associated file descriptor is ready for
writing. writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is Subclasses must implement they are ever registerd with
ever called on them. :meth:`Broker._start_transmit`.
The default implementation invokes :meth:`Protocol.on_transmit`.
""" """
self.protocol.on_transmit(broker) self.protocol.on_transmit(broker)
def on_shutdown(self, broker): def on_shutdown(self, broker):
""" """
Called by :meth:`Broker.shutdown` to allow the stream time to Invoked by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called gracefully shutdown.
:meth:`on_disconnect`.
The default implementation emits a ``shutdown`` signal before
invoking :meth:`on_disconnect`.
""" """
fire(self, 'shutdown') fire(self, 'shutdown')
self.protocol.on_shutdown(broker) self.protocol.on_shutdown(broker)
def on_disconnect(self, broker): def on_disconnect(self, broker):
""" """
Called by :class:`Broker` to force disconnect the stream. The base Invoked by :class:`Broker` to force disconnect the stream during
implementation simply closes :attr:`receive_side` and shutdown, invoked by the default :meth:`on_shutdown` implementation,
:attr:`transmit_side` and unregisters the stream from the broker. and usually invoked by any subclass :meth:`on_receive` implementation
in response to a 0-byte read.
The base implementation fires a ``disconnect`` event, then closes
:attr:`receive_side` and :attr:`transmit_side` after unregistering the
stream from the broker.
""" """
fire(self, 'disconnect') fire(self, 'disconnect')
self.protocol.on_disconnect(broker) self.protocol.on_disconnect(broker)
@ -1666,6 +1763,8 @@ class Protocol(object):
#: :data:`None`. #: :data:`None`.
stream = None stream = None
#: The size of the read buffer used by :class:`Stream` when this is the
#: active protocol for the stream.
read_size = CHUNK_SIZE read_size = CHUNK_SIZE
@classmethod @classmethod
@ -2369,8 +2468,18 @@ class Latch(object):
See :ref:`waking-sleeping-threads` for further discussion. See :ref:`waking-sleeping-threads` for further discussion.
""" """
#: The :class:`Poller` implementation to use for waiting. Since the poller
#: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
#: if it is available, or :class:`mitogen.core.Poller` otherwise, since
#: these implementations require no system calls to create, configure or
#: destroy.
poller_class = Poller poller_class = Poller
#: If not :data:`None`, a function invoked as `notify(latch)` after a
#: successful call to :meth:`put`. The function is invoked on the
#: :meth:`put` caller's thread, which may be the :class:`Broker` thread,
#: therefore it must not block. Used by :class:`mitogen.select.Select` to
#: efficiently implement waiting on multiple event sources.
notify = None notify = None
# The _cls_ prefixes here are to make it crystal clear in the code which # The _cls_ prefixes here are to make it crystal clear in the code which
@ -2725,15 +2834,22 @@ class Waker(Protocol):
class IoLoggerProtocol(DelimitedProtocol): class IoLoggerProtocol(DelimitedProtocol):
""" """
Handle redirection of standard IO into the :mod:`logging` package. Attached to one end of a socket pair whose other end overwrites one of the
standard ``stdout`` or ``stderr`` file descriptors in a child context.
Received data is split up into lines, decoded as UTF-8 and logged to the
:mod:`logging` package as either the ``stdout`` or ``stderr`` logger.
Logging in child contexts is in turn forwarded to the master process using
:class:`LogHandler`.
""" """
@classmethod @classmethod
def build_stream(cls, name, dest_fd): def build_stream(cls, name, dest_fd):
""" """
Even though the descriptor `dest_fd` will hold the opposite end of the Even though the file descriptor `dest_fd` will hold the opposite end of
socket open, we must keep a separate dup() of it (i.e. wsock) in case the socket open, we must keep a separate dup() of it (i.e. wsock) in
some code decides to overwrite `dest_fd` later, which would thus break case some code decides to overwrite `dest_fd` later, which would
:meth:`on_shutdown`. prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
<socket.socket.shutdown>` on it.
""" """
rsock, wsock = socket.socketpair() rsock, wsock = socket.socketpair()
os.dup2(wsock.fileno(), dest_fd) os.dup2(wsock.fileno(), dest_fd)
@ -3370,9 +3486,9 @@ class Broker(object):
for _, (side, _) in self.poller.readers + self.poller.writers: for _, (side, _) in self.poller.readers + self.poller.writers:
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout deadline = now() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline: while self.keep_alive() and now() < deadline:
self._loop_once(max(0, deadline - time.time())) self._loop_once(max(0, deadline - now()))
if self.keep_alive(): if self.keep_alive():
LOG.error('%r: pending work still existed %d seconds after ' LOG.error('%r: pending work still existed %d seconds after '

@ -741,7 +741,7 @@ class ModuleFinder(object):
The list is determined by retrieving the source code of The list is determined by retrieving the source code of
`fullname`, compiling it, and examining all IMPORT_NAME ops. `fullname`, compiling it, and examining all IMPORT_NAME ops.
:param fullname: Fully qualified name of an _already imported_ module :param fullname: Fully qualified name of an *already imported* module
for which source code can be retrieved for which source code can be retrieved
:type fullname: str :type fullname: str
""" """
@ -789,7 +789,7 @@ class ModuleFinder(object):
This method is like :py:meth:`find_related_imports`, but also This method is like :py:meth:`find_related_imports`, but also
recursively searches any modules which are imported by `fullname`. recursively searches any modules which are imported by `fullname`.
:param fullname: Fully qualified name of an _already imported_ module :param fullname: Fully qualified name of an *already imported* module
for which source code can be retrieved for which source code can be retrieved
:type fullname: str :type fullname: str
""" """
@ -841,7 +841,7 @@ class ModuleResponder(object):
def add_source_override(self, fullname, path, source, is_pkg): def add_source_override(self, fullname, path, source, is_pkg):
""" """
See :meth:`ModuleFinder.add_source_override. See :meth:`ModuleFinder.add_source_override`.
""" """
self._finder.add_source_override(fullname, path, source, is_pkg) self._finder.add_source_override(fullname, path, source, is_pkg)
@ -910,9 +910,9 @@ class ModuleResponder(object):
if self.minify_safe_re.search(source): if self.minify_safe_re.search(source):
# If the module contains a magic marker, it's safe to minify. # If the module contains a magic marker, it's safe to minify.
t0 = time.time() t0 = mitogen.core.now()
source = mitogen.minify.minimize_source(source).encode('utf-8') source = mitogen.minify.minimize_source(source).encode('utf-8')
self.minify_secs += time.time() - t0 self.minify_secs += mitogen.core.now() - t0
if is_pkg: if is_pkg:
pkg_present = get_child_modules(path) pkg_present = get_child_modules(path)
@ -1001,11 +1001,11 @@ class ModuleResponder(object):
LOG.warning('_on_get_module(): dup request for %r from %r', LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream) fullname, stream)
t0 = time.time() t0 = mitogen.core.now()
try: try:
self._send_module_and_related(stream, fullname) self._send_module_and_related(stream, fullname)
finally: finally:
self.get_module_secs += time.time() - t0 self.get_module_secs += mitogen.core.now() - t0
def _send_forward_module(self, stream, context, fullname): def _send_forward_module(self, stream, context, fullname):
if stream.protocol.remote_id != context.context_id: if stream.protocol.remote_id != context.context_id:

@ -633,7 +633,7 @@ class TimerList(object):
:meth:`expire`. The main user interface to :class:`TimerList` is :meth:`expire`. The main user interface to :class:`TimerList` is
:meth:`schedule`. :meth:`schedule`.
""" """
_now = time.time _now = mitogen.core.now
def __init__(self): def __init__(self):
self._lst = [] self._lst = []
@ -1124,7 +1124,7 @@ class LineLoggingProtocolMixin(object):
def on_line_received(self, line): def on_line_received(self, line):
self.logged_partial = None self.logged_partial = None
self.logged_lines.append((time.time(), line)) self.logged_lines.append((mitogen.core.now(), line))
self.logged_lines[:] = self.logged_lines[-100:] self.logged_lines[:] = self.logged_lines[-100:]
return super(LineLoggingProtocolMixin, self).on_line_received(line) return super(LineLoggingProtocolMixin, self).on_line_received(line)
@ -1134,7 +1134,7 @@ class LineLoggingProtocolMixin(object):
def on_disconnect(self, broker): def on_disconnect(self, broker):
if self.logged_partial: if self.logged_partial:
self.logged_lines.append((time.time(), self.logged_partial)) self.logged_lines.append((mitogen.core.now(), self.logged_partial))
self.logged_partial = None self.logged_partial = None
super(LineLoggingProtocolMixin, self).on_disconnect(broker) super(LineLoggingProtocolMixin, self).on_disconnect(broker)
@ -1324,7 +1324,7 @@ class Options(object):
self.profiling = profiling self.profiling = profiling
self.unidirectional = unidirectional self.unidirectional = unidirectional
self.max_message_size = max_message_size self.max_message_size = max_message_size
self.connect_deadline = time.time() + self.connect_timeout self.connect_deadline = mitogen.core.now() + self.connect_timeout
class Connection(object): class Connection(object):
@ -1819,7 +1819,7 @@ class CallChain(object):
socket.gethostname(), socket.gethostname(),
os.getpid(), os.getpid(),
thread.get_ident(), thread.get_ident(),
int(1e6 * time.time()), int(1e6 * mitogen.core.now()),
) )
def __repr__(self): def __repr__(self):
@ -2331,7 +2331,7 @@ class Router(mitogen.core.Router):
directly connected. directly connected.
""" """
stream = self.stream_by_id(context) stream = self.stream_by_id(context)
if stream.protocol.remote_id != context.context_id: if stream is None or stream.protocol.remote_id != context.context_id:
return return
l = mitogen.core.Latch() l = mitogen.core.Latch()
@ -2516,7 +2516,7 @@ class Reaper(object):
:param mitogen.core.Broker broker: :param mitogen.core.Broker broker:
The :class:`Broker` on which to install timers The :class:`Broker` on which to install timers
:param Process proc: :param mitogen.parent.Process proc:
The process to reap. The process to reap.
:param bool kill: :param bool kill:
If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process. If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
@ -2569,7 +2569,7 @@ class Reaper(object):
def _install_timer(self, delay): def _install_timer(self, delay):
new = self._timer is None new = self._timer is None
self._timer = self.broker.timers.schedule( self._timer = self.broker.timers.schedule(
when=time.time() + delay, when=mitogen.core.now() + delay,
func=self.reap, func=self.reap,
) )
if new: if new:
@ -2589,6 +2589,7 @@ class Reaper(object):
status = self.proc.poll() status = self.proc.poll()
if status is not None: if status is not None:
LOG.debug('%r: %s', self.proc, returncode_to_str(status)) LOG.debug('%r: %s', self.proc, returncode_to_str(status))
mitogen.core.fire(self.proc, 'exit')
self._remove_timer() self._remove_timer()
return return

@ -1109,7 +1109,7 @@ class FileService(Service):
:meth:`fetch`. :meth:`fetch`.
""" """
LOG.debug('get_file(): fetching %r from %r', path, context) LOG.debug('get_file(): fetching %r from %r', path, context)
t0 = time.time() t0 = mitogen.core.now()
recv = mitogen.core.Receiver(router=context.router) recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service( metadata = context.call_service(
service_name=cls.name(), service_name=cls.name(),
@ -1143,5 +1143,6 @@ class FileService(Service):
path, metadata['size'], received_bytes) path, metadata['size'], received_bytes)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0)) metadata['size'], path, context,
1000 * (mitogen.core.now() - t0))
return ok, metadata return ok, metadata

@ -24,10 +24,12 @@ conn = mitogen.ssh.Connection(options, router)
conn.context = context conn.context = context
print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),)) print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % ( print('Bootstrap (mitogen.core) size: %s (%.2fKiB)' % (
len(conn.get_preamble()), len(conn.get_preamble()),
len(conn.get_preamble()) / 1024.0, len(conn.get_preamble()) / 1024.0,
)) ))
print('')
if '--dump' in sys.argv: if '--dump' in sys.argv:
print(zlib.decompress(conn.get_preamble())) print(zlib.decompress(conn.get_preamble()))
exit() exit()
@ -57,7 +59,7 @@ for mod in (
original_size = len(original) original_size = len(original)
minimized = mitogen.minify.minimize_source(original) minimized = mitogen.minify.minimize_source(original)
minimized_size = len(minimized) minimized_size = len(minimized)
compressed = zlib.compress(minimized, 9) compressed = zlib.compress(minimized.encode(), 9)
compressed_size = len(compressed) compressed_size = len(compressed)
print( print(
'%-25s' '%-25s'

@ -3,13 +3,13 @@ Measure latency of .fork() setup/teardown.
""" """
import mitogen import mitogen
import time import mitogen.core
@mitogen.main() @mitogen.main()
def main(router): def main(router):
t0 = time.time() t0 = mitogen.core.now()
for x in xrange(200): for x in xrange(200):
t = time.time() t = mitogen.core.now()
ctx = router.fork() ctx = router.fork()
ctx.shutdown(wait=True) ctx.shutdown(wait=True)
print '++', 1000 * ((time.time() - t0) / (1.0+x)) print '++', 1000 * ((mitogen.core.now() - t0) / (1.0+x))

@ -4,7 +4,9 @@
import subprocess import subprocess
import time import time
import socket import socket
import mitogen import mitogen
import mitogen.core
@mitogen.main() @mitogen.main()
@ -15,12 +17,12 @@ def main(router):
s = ' ' * n s = ' ' * n
print('bytes in %.2fMiB string...' % (n/1048576.0),) print('bytes in %.2fMiB string...' % (n/1048576.0),)
t0 = time.time() t0 = mitogen.core.now()
for x in range(10): for x in range(10):
tt0 = time.time() tt0 = mitogen.core.now()
assert n == c.call(len, s) assert n == c.call(len, s)
print('took %dms' % (1000 * (time.time() - tt0),)) print('took %dms' % (1000 * (mitogen.core.now() - tt0),))
t1 = time.time() t1 = mitogen.core.now()
print('total %dms / %dms avg / %.2fMiB/sec' % ( print('total %dms / %dms avg / %.2fMiB/sec' % (
1000 * (t1 - t0), 1000 * (t1 - t0),
(1000 * (t1 - t0)) / (x + 1), (1000 * (t1 - t0)) / (x + 1),

@ -6,6 +6,7 @@ import threading
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -33,8 +34,8 @@ t2.start()
ready.get() ready.get()
ready.get() ready.get()
t0 = time.time() t0 = mitogen.core.now()
l1.put(None) l1.put(None)
t1.join() t1.join()
t2.join() t2.join()
print('++', int(1e6 * ((time.time() - t0) / (1.0+X))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+X))), 'usec')

@ -5,6 +5,7 @@ Measure latency of .local() setup.
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -15,10 +16,10 @@ mitogen.utils.setup_gil()
@mitogen.main() @mitogen.main()
def main(router): def main(router):
t0=time.time() t0 = mitogen.core.now()
for x in range(100): for x in range(100):
t = time.time() t = mitogen.core.now()
f = router.local()# debug=True) f = router.local()# debug=True)
tt = time.time() tt = mitogen.core.now()
print(x, 1000 * (tt - t)) print(x, 1000 * (tt - t))
print('%.03f ms' % (1000 * (time.time() - t0) / (1.0 + x))) print('%.03f ms' % (1000 * (mitogen.core.now() - t0) / (1.0 + x)))

@ -4,12 +4,14 @@ import sys
import os import os
import time import time
import mitogen.core
times = [] times = []
for x in range(5): for x in range(5):
t0 = time.time() t0 = mitogen.core.now()
os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:]) os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:])
t = time.time() - t0 t = mitogen.core.now() - t0
times.append(t) times.append(t)
print('+++', t) print('+++', t)

@ -5,6 +5,7 @@ Measure latency of local RPC.
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -23,7 +24,7 @@ def do_nothing():
def main(router): def main(router):
f = router.fork() f = router.fork()
f.call(do_nothing) f.call(do_nothing)
t0 = time.time() t0 = mitogen.core.now()
for x in xrange(20000): for x in xrange(20000):
f.call(do_nothing) f.call(do_nothing)
print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec')

@ -4,8 +4,9 @@ Measure latency of local service RPC.
import time import time
import mitogen.service
import mitogen import mitogen
import mitogen.core
import mitogen.service
class MyService(mitogen.service.Service): class MyService(mitogen.service.Service):
@ -17,7 +18,7 @@ class MyService(mitogen.service.Service):
@mitogen.main() @mitogen.main()
def main(router): def main(router):
f = router.fork() f = router.fork()
t0 = time.time() t0 = mitogen.core.now()
for x in range(1000): for x in range(1000):
f.call_service(service_name=MyService, method_name='ping') f.call_service(service_name=MyService, method_name='ping')
print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec')

@ -6,6 +6,7 @@ import sys
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -24,12 +25,12 @@ def do_nothing():
def main(router): def main(router):
f = router.ssh(hostname=sys.argv[1]) f = router.ssh(hostname=sys.argv[1])
f.call(do_nothing) f.call(do_nothing)
t0 = time.time() t0 = mitogen.core.now()
end = time.time() + 5.0 end = mitogen.core.now() + 5.0
i = 0 i = 0
while time.time() < end: while mitogen.core.now() < end:
f.call(do_nothing) f.call(do_nothing)
i += 1 i += 1
t1 = time.time() t1 = mitogen.core.now()
print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms') print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms')

@ -8,6 +8,7 @@ import tempfile
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.service import mitogen.service
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -35,9 +36,9 @@ def run_test(router, fp, s, context):
size = fp.tell() size = fp.tell()
print('Testing %s...' % (s,)) print('Testing %s...' % (s,))
context.call(prepare) context.call(prepare)
t0 = time.time() t0 = mitogen.core.now()
context.call(transfer, router.myself(), fp.name) context.call(transfer, router.myself(), fp.name)
t1 = time.time() t1 = mitogen.core.now()
print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % ( print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % (
s, 1000 * (t1 - t0), size / 1048576.0, s, 1000 * (t1 - t0), size / 1048576.0,
(size / (t1 - t0) / 1048576.0), (size / (t1 - t0) / 1048576.0),

@ -9,6 +9,7 @@ import tempfile
import mock import mock
import unittest2 import unittest2
import mitogen.core
import mitogen.parent import mitogen.parent
from mitogen.core import b from mitogen.core import b
@ -188,7 +189,6 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.stdin.fileno()) mitogen.core.set_block(proc.stdin.fileno())
# read(3) below due to https://bugs.python.org/issue37696 # read(3) below due to https://bugs.python.org/issue37696
self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3)) self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3))
@ -271,7 +271,6 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
deadline = time.time() + 5.0
self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3)) self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3))
waited_pid, status = os.waitpid(proc.pid, 0) waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid) self.assertEquals(proc.pid, waited_pid)

@ -12,6 +12,7 @@ import unittest2
import testlib import testlib
from testlib import Popen__terminate from testlib import Popen__terminate
import mitogen.core
import mitogen.parent import mitogen.parent
try: try:
@ -21,8 +22,8 @@ except NameError:
def wait_for_child(pid, timeout=1.0): def wait_for_child(pid, timeout=1.0):
deadline = time.time() + timeout deadline = mitogen.core.now() + timeout
while timeout < time.time(): while timeout < mitogen.core.now():
try: try:
target_pid, status = os.waitpid(pid, os.WNOHANG) target_pid, status = os.waitpid(pid, os.WNOHANG)
if target_pid == pid: if target_pid == pid:

@ -164,14 +164,14 @@ class CloseMixin(PollerMixin):
class PollMixin(PollerMixin): class PollMixin(PollerMixin):
def test_empty_zero_timeout(self): def test_empty_zero_timeout(self):
t0 = time.time() t0 = mitogen.core.now()
self.assertEquals([], list(self.p.poll(0))) self.assertEquals([], list(self.p.poll(0)))
self.assertTrue((time.time() - t0) < .1) # vaguely reasonable self.assertTrue((mitogen.core.now() - t0) < .1) # vaguely reasonable
def test_empty_small_timeout(self): def test_empty_small_timeout(self):
t0 = time.time() t0 = mitogen.core.now()
self.assertEquals([], list(self.p.poll(.2))) self.assertEquals([], list(self.p.poll(.2)))
self.assertTrue((time.time() - t0) >= .2) self.assertTrue((mitogen.core.now() - t0) >= .2)
class ReadableMixin(PollerMixin, SockMixin): class ReadableMixin(PollerMixin, SockMixin):

@ -1,3 +1,5 @@
import errno
import os
import sys import sys
import time import time
import zlib import zlib
@ -425,5 +427,108 @@ class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
])) ]))
class ShutdownTest(testlib.RouterMixin, testlib.TestCase):
# 613: tests for all the weird shutdown() variants we ended up with.
def test_shutdown_wait_false(self):
l1 = self.router.local()
pid = l1.call(os.getpid)
conn = self.router.stream_by_id(l1.context_id).conn
exitted = mitogen.core.Latch()
mitogen.core.listen(conn.proc, 'exit', exitted.put)
l1.shutdown(wait=False)
exitted.get()
e = self.assertRaises(OSError,
lambda: os.waitpid(pid, 0))
self.assertEquals(e.args[0], errno.ECHILD)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: l1.call(os.getpid))
self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % (
l1.context_id,
mitogen.context_id,
))
def test_shutdown_wait_true(self):
l1 = self.router.local()
pid = l1.call(os.getpid)
conn = self.router.stream_by_id(l1.context_id).conn
exitted = mitogen.core.Latch()
mitogen.core.listen(conn.proc, 'exit', exitted.put)
l1.shutdown(wait=True)
exitted.get()
e = self.assertRaises(OSError,
lambda: os.waitpid(pid, 0))
self.assertEquals(e.args[0], errno.ECHILD)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: l1.call(os.getpid))
self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % (
l1.context_id,
mitogen.context_id,
))
def test_disconnect_invalid_context(self):
self.router.disconnect(
mitogen.core.Context(self.router, 1234)
)
def test_disconnect_valid_context(self):
l1 = self.router.local()
pid = l1.call(os.getpid)
strm = self.router.stream_by_id(l1.context_id)
exitted = mitogen.core.Latch()
mitogen.core.listen(strm.conn.proc, 'exit', exitted.put)
self.router.disconnect_stream(strm)
exitted.get()
e = self.assertRaises(OSError,
lambda: os.waitpid(pid, 0))
self.assertEquals(e.args[0], errno.ECHILD)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: l1.call(os.getpid))
self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % (
l1.context_id,
mitogen.context_id,
))
def test_disconnet_all(self):
l1 = self.router.local()
l2 = self.router.local()
pids = [l1.call(os.getpid), l2.call(os.getpid)]
exitted = mitogen.core.Latch()
for ctx in l1, l2:
strm = self.router.stream_by_id(ctx.context_id)
mitogen.core.listen(strm.conn.proc, 'exit', exitted.put)
self.router.disconnect_all()
exitted.get()
exitted.get()
for pid in pids:
e = self.assertRaises(OSError,
lambda: os.waitpid(pid, 0))
self.assertEquals(e.args[0], errno.ECHILD)
for ctx in l1, l2:
e = self.assertRaises(mitogen.core.ChannelError,
lambda: ctx.call(os.getpid))
self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % (
ctx.context_id,
mitogen.context_id,
))
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -107,11 +107,11 @@ def wait_for_port(
If a regex pattern is supplied try to find it in the initial data. If a regex pattern is supplied try to find it in the initial data.
Return None on success, or raise on error. Return None on success, or raise on error.
""" """
start = time.time() start = mitogen.core.now()
end = start + overall_timeout end = start + overall_timeout
addr = (host, port) addr = (host, port)
while time.time() < end: while mitogen.core.now() < end:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(connect_timeout) sock.settimeout(connect_timeout)
try: try:
@ -130,7 +130,7 @@ def wait_for_port(
sock.settimeout(receive_timeout) sock.settimeout(receive_timeout)
data = mitogen.core.b('') data = mitogen.core.b('')
found = False found = False
while time.time() < end: while mitogen.core.now() < end:
try: try:
resp = sock.recv(1024) resp = sock.recv(1024)
except socket.timeout: except socket.timeout:

@ -162,7 +162,7 @@ def do_timer_test_econtext(econtext):
def do_timer_test(broker): def do_timer_test(broker):
now = time.time() now = mitogen.core.now()
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
broker.defer(lambda: broker.defer(lambda:
broker.timers.schedule( broker.timers.schedule(
@ -172,7 +172,7 @@ def do_timer_test(broker):
) )
assert 'hi' == latch.get() assert 'hi' == latch.get()
assert time.time() > (now + 0.250) assert mitogen.core.now() > (now + 0.250)
class BrokerTimerTest(testlib.TestCase): class BrokerTimerTest(testlib.TestCase):

@ -86,12 +86,12 @@ class ClientTest(testlib.TestCase):
def _try_connect(self, path): def _try_connect(self, path):
# give server a chance to setup listener # give server a chance to setup listener
timeout = time.time() + 30.0 timeout = mitogen.core.now() + 30.0
while True: while True:
try: try:
return mitogen.unix.connect(path) return mitogen.unix.connect(path)
except mitogen.unix.ConnectError: except mitogen.unix.ConnectError:
if time.time() > timeout: if mitogen.core.now() > timeout:
raise raise
time.sleep(0.1) time.sleep(0.1)

Loading…
Cancel
Save