diff --git a/ansible_mitogen/affinity.py b/ansible_mitogen/affinity.py index 9eb6597a..67e16d8a 100644 --- a/ansible_mitogen/affinity.py +++ b/ansible_mitogen/affinity.py @@ -92,37 +92,37 @@ try: _libc = ctypes.CDLL(None, use_errno=True) _strerror = _libc.strerror _strerror.restype = ctypes.c_char_p - _pthread_mutex_init = _libc.pthread_mutex_init - _pthread_mutex_lock = _libc.pthread_mutex_lock - _pthread_mutex_unlock = _libc.pthread_mutex_unlock + _sem_init = _libc.sem_init + _sem_wait = _libc.sem_wait + _sem_post = _libc.sem_post _sched_setaffinity = _libc.sched_setaffinity except (OSError, AttributeError): _libc = None _strerror = None - _pthread_mutex_init = None - _pthread_mutex_lock = None - _pthread_mutex_unlock = None + _sem_init = None + _sem_wait = None + _sem_post = None _sched_setaffinity = None -class pthread_mutex_t(ctypes.Structure): +class sem_t(ctypes.Structure): """ - Wrap pthread_mutex_t to allow storing a lock in shared memory. + Wrap sem_t to allow storing a lock in shared memory. """ _fields_ = [ - ('data', ctypes.c_uint8 * 512), + ('data', ctypes.c_uint8 * 128), ] def init(self): - if _pthread_mutex_init(self.data, 0): + if _sem_init(self.data, 1, 1): raise Exception(_strerror(ctypes.get_errno())) def acquire(self): - if _pthread_mutex_lock(self.data): + if _sem_wait(self.data): raise Exception(_strerror(ctypes.get_errno())) def release(self): - if _pthread_mutex_unlock(self.data): + if _sem_post(self.data): raise Exception(_strerror(ctypes.get_errno())) @@ -133,7 +133,7 @@ class State(ctypes.Structure): the context of the new child process. """ _fields_ = [ - ('lock', pthread_mutex_t), + ('lock', sem_t), ('counter', ctypes.c_uint8), ] diff --git a/docs/api.rst b/docs/api.rst index 2f1f9784..09aa8582 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -2,11 +2,6 @@ API Reference ************* -.. toctree:: - :hidden: - - signals - Package Layout ============== @@ -31,29 +26,10 @@ mitogen.core .. automodule:: mitogen.core .. currentmodule:: mitogen.core -.. decorator:: takes_econtext - - Decorator that marks a function or class method to automatically receive a - kwarg named `econtext`, referencing the - :class:`mitogen.core.ExternalContext` active in the context in which the - function is being invoked in. The decorator is only meaningful when the - function is invoked via :data:`CALL_FUNCTION - `. - - When the function is invoked directly, `econtext` must still be passed to - it explicitly. +.. autodecorator:: takes_econtext .. currentmodule:: mitogen.core -.. decorator:: takes_router - - Decorator that marks a function or class method to automatically receive a - kwarg named `router`, referencing the :class:`mitogen.core.Router` - active in the context in which the function is being invoked in. The - decorator is only meaningful when the function is invoked via - :data:`CALL_FUNCTION `. - - When the function is invoked directly, `router` must still be passed to it - explicitly. +.. autodecorator:: takes_router mitogen.master @@ -627,6 +603,14 @@ Fork Safety Utility Functions ================= +.. currentmodule:: mitogen.core +.. function:: now + + A reference to :func:`time.time` on Python 2, or :func:`time.monotonic` on + Python >3.3. We prefer :func:`time.monotonic` when available to ensure + timers are not impacted by system clock changes. + + .. module:: mitogen.utils A random assortment of utility functions useful on masters and children. diff --git a/docs/changelog.rst b/docs/changelog.rst index 6155eb43..63266113 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -22,7 +22,7 @@ To avail of fixes in an unreleased version, please download a ZIP file `directly from GitHub `_. Enhancements -^^^^^^^^^^^^ +~~~~~~~~~~~~ * `#556 `_, `#587 `_: Ansible 2.8 is partially @@ -61,7 +61,7 @@ Enhancements Mitogen for Ansible -^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~ * `#363 `_: fix an obscure race matching *Permission denied* errors from some versions of ``su`` running on @@ -107,20 +107,21 @@ Mitogen for Ansible server has been increased from `15*3` seconds to `30*10` seconds. * `#600 `_: functionality to reflect - changes to ``/etc/environment`` in the running interpreter did not account - for Unicode file contents. Now the file may contain data in any single byte - encoding. + changes to ``/etc/environment`` did not account for Unicode file contents. + The file may now use any single byte encoding. * `#602 `_: connection configuration is more accurately inferred for `meta: reset_connection`, the `synchronize` - module, and for any other action plug-ins that establish new connections of - their own. + module, and for any action plug-ins that establish additional connections. + +* `#605 `_: fix a deadlock managing a + shared counter used for load balancing. * `#615 `_: streaming file transfer - is implemented for the ``fetch`` and any other action that transfers files - from the target to the controller. Previously the file would be sent as a - single message, requiring the file to fit in RAM and be smaller than internal - limits on the size of a single message. + is implemented for ``fetch`` and other actions that transfer files from the + target to the controller. Previously the file was sent in one message, + requiring it to fit in RAM and be smaller than the internal message size + limit. * `7ae926b3 `_: the ``lineinfile`` module began leaking writable temporary file descriptors since @@ -188,21 +189,20 @@ Core Library :meth:`empty` method of :class:`mitogen.core.Latch`, :class:`mitogen.core.Receiver` and :class:`mitogen.select.Select` has been replaced by a more general :meth:`size` method. :meth:`empty` will be removed - in Mitogen 0.3 + in 0.3 * `ecc570cb `_: previously - :meth:`mitogen.select.Select.add` would enqueue a single wake event when - adding an existing receiver, latch or subselect that contained multiple - buffered items, causing future :meth:`get` calls to block or fail even though - data existed that could be returned. + :meth:`mitogen.select.Select.add` would enqueue one wake event when adding an + existing receiver, latch or subselect that contained multiple buffered items, + causing :meth:`get` calls to block or fail even though data existed to return. -* `5924af15 `_: *[security]* the - unidirectional routing mode, in which contexts may only communicate with - parents and never siblings (so a program cannot accidentally bridge - air-gapped networks) was not inherited when a child context was initiated - directly from an existing child. This did not effect the Ansible extension, - since the controller initiates any new context used for routing, only forked - tasks are initiated by children. +* `5924af15 `_: *[security]* + unidirectional routing, where contexts may optionally only communicate with + parents and never siblings (so that air-gapped networks cannot be + unintentionally bridged) was not inherited when a child was initiated + directly from an another child. This did not effect Ansible, since the + controller initiates any new child used for routing, only forked tasks are + initiated by children. Thanks! @@ -491,7 +491,7 @@ Enhancements `#491 `_, `#493 `_: the interface employed for in-process queues changed from `kqueue - `_ / `epoll + `_ / `epoll `_ to `poll() `_, which requires no setup or teardown, yielding a 38% latency reduction for inter-thread communication. @@ -1037,7 +1037,7 @@ bug reports, testing, features and fixes in this release contributed by `Josh Smift `_, `Luca Nunzi `_, `Orion Poplawski `_, -`Peter V. Saveliev `_, +`Peter V. Saveliev `_, `Pierre-Henry Muller `_, `Pierre-Louis Bonicoli `_, `Prateek Jain `_, @@ -1095,7 +1095,7 @@ Core Library * `#300 `_: the broker could crash on OS X during shutdown due to scheduled `kqueue - `_ filter changes for + `_ filter changes for descriptors that were closed before the IO loop resumes. As a temporary workaround, kqueue's bulk change feature is not used. diff --git a/docs/conf.py b/docs/conf.py index 2ee63aa8..aa91c8b8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,7 +7,7 @@ VERSION = '%s.%s.%s' % mitogen.__version__ author = u'Network Genomics' copyright = u'2019, Network Genomics' -exclude_patterns = ['_build'] +exclude_patterns = ['_build', '.venv'] extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput'] html_show_copyright = False html_show_sourcelink = False diff --git a/docs/internals.rst b/docs/internals.rst index 1df1c2ad..40ea33df 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -7,11 +7,6 @@ Internal API Reference Internal APIs are subject to rapid change even across minor releases. This page exists to help users modify and extend the library. -.. toctree:: - :hidden: - - signals - Constants ========= @@ -50,6 +45,10 @@ Logging See also :class:`mitogen.core.IoLoggerProtocol`. +.. currentmodule:: mitogen.core +.. autoclass:: LogHandler + :members: + .. currentmodule:: mitogen.master .. autoclass:: LogForwarder :members: @@ -270,6 +269,8 @@ Helpers .. autofunction:: minimize_source +.. _signals: + Signals ======= @@ -312,10 +313,19 @@ These signals are used internally by Mitogen. - ``disconnect`` - Fired on the Broker thread when disconnection is detected. + * - :py:class:`mitogen.core.Stream` + - ``shutdown`` + - Fired on the Broker thread when broker shutdown begins. + * - :py:class:`mitogen.core.Context` - ``disconnect`` - Fired on the Broker thread during shutdown (???) + * - :py:class:`mitogen.parent.Process` + - ``exit`` + - Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully + exitted. + * - :py:class:`mitogen.core.Broker` - ``shutdown`` - Fired after Broker.shutdown() is called. diff --git a/docs/requirements.txt b/docs/requirements.txt index a93c2140..85f30a2e 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,3 @@ -Sphinx==1.7.1 -sphinxcontrib-programoutput==0.11 +Sphinx==2.1.2 +sphinxcontrib-programoutput==0.14 alabaster==0.7.10 diff --git a/mitogen/core.py b/mitogen/core.py index 8b4f135e..d6e1739e 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -362,6 +362,10 @@ def to_text(o): return UnicodeType(o) +# Documented in api.rst to work around Sphinx limitation. +now = getattr(time, 'monotonic', time.time) + + # Python 2.4 try: any @@ -448,11 +452,31 @@ def fire(obj, name, *args, **kwargs): def takes_econtext(func): + """ + Decorator that marks a function or class method to automatically receive a + kwarg named `econtext`, referencing the + :class:`mitogen.core.ExternalContext` active in the context in which the + function is being invoked in. The decorator is only meaningful when the + function is invoked via :data:`CALL_FUNCTION `. + + When the function is invoked directly, `econtext` must still be passed to + it explicitly. + """ func.mitogen_takes_econtext = True return func def takes_router(func): + """ + Decorator that marks a function or class method to automatically receive a + kwarg named `router`, referencing the :class:`mitogen.core.Router` active + in the context in which the function is being invoked in. The decorator is + only meaningful when the function is invoked via :data:`CALL_FUNCTION + `. + + When the function is invoked directly, `router` must still be passed to it + explicitly. + """ func.mitogen_takes_router = True return func @@ -616,7 +640,7 @@ def _real_profile_hook(name, func, *args): return func(*args) finally: path = _profile_fmt % { - 'now': int(1e6 * time.time()), + 'now': int(1e6 * now()), 'identity': name, 'pid': os.getpid(), 'ext': '%s' @@ -1024,11 +1048,11 @@ class Receiver(object): routed to the context due to disconnection, and ignores messages that did not originate from the respondent context. """ - #: If not :data:`None`, a reference to a function invoked as - #: `notify(receiver)` when a new message is delivered to this receiver. The - #: function is invoked on the broker thread, therefore it must not block. - #: Used by :class:`mitogen.select.Select` to implement waiting on multiple - #: receivers. + #: If not :data:`None`, a function invoked as `notify(receiver)` after a + #: message has been received. The function is invoked on :class:`Broker` + #: thread, therefore it must not block. Used by + #: :class:`mitogen.select.Select` to efficiently implement waiting on + #: multiple event sources. notify = None raise_channelerror = True @@ -1513,6 +1537,22 @@ class Importer(object): class LogHandler(logging.Handler): + """ + A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG` + messages to be sent to a parent context in response to logging messages + generated by the current context. This is installed by default in child + contexts during bootstrap, so that :mod:`logging` events can be viewed and + managed centrally in the master process. + + The handler is initially *corked* after construction, such that it buffers + messages until :meth:`uncork` is called. This allows logging to be + installed prior to communication with the target being available, and + avoids any possible race where early log messages might be dropped. + + :param mitogen.core.Context context: + The context to send log messages towards. At present this is always + the master process. + """ def __init__(self, context): logging.Handler.__init__(self) self.context = context @@ -1549,6 +1589,9 @@ class LogHandler(logging.Handler): self._buffer_lock.release() def emit(self, rec): + """ + Send a :data:`FORWARD_LOG` message towards the target context. + """ if rec.name == 'mitogen.io' or \ getattr(self.local, 'in_emit', False): return @@ -1566,6 +1609,30 @@ class LogHandler(logging.Handler): class Stream(object): + """ + A :class:`Stream` is one readable and optionally one writeable file + descriptor (represented by :class:`Side`) aggregated alongside an + associated :class:`Protocol` that knows how to respond to IO readiness + events for those descriptors. + + Streams are registered with :class:`Broker`, and callbacks are invoked on + the broker thread in response to IO activity. When registered using + :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker + may call any of :meth:`on_receive`, :meth:`on_transmit`, + :meth:`on_shutdown` or :meth:`on_disconnect`. + + It is expected that the :class:`Protocol` associated with a stream will + change over its life. For example during connection setup, the initial + protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to + enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to + the target, before handing off to :class:`MitogenProtocol` when the target + process is initialized. + + Streams connecting to children are in turn aggregated by + :class:`mitogen.parent.Connection`, which contains additional logic for + managing any child process, and a reference to any separate ``stderr`` + :class:`Stream` connected to that process. + """ #: A :class:`Side` representing the stream's receive file descriptor. receive_side = None @@ -1578,14 +1645,16 @@ class Stream(object): #: In parents, the :class:`mitogen.parent.Connection` instance. conn = None + #: The stream name. This is used in the :meth:`__repr__` output in any log + #: messages, it may be any descriptive string. name = u'default' def set_protocol(self, protocol): """ - Bind a protocol to this stream, by updating :attr:`Protocol.stream` to - refer to this stream, and updating this stream's - :attr:`Stream.protocol` to the refer to the protocol. Any prior - protocol's :attr:`Protocol.stream` is set to :data:`None`. + Bind a :class:`Protocol` to this stream, by updating + :attr:`Protocol.stream` to refer to this stream, and updating this + stream's :attr:`Stream.protocol` to the refer to the protocol. Any + prior protocol's :attr:`Protocol.stream` is set to :data:`None`. """ if self.protocol: self.protocol.stream = None @@ -1593,6 +1662,21 @@ class Stream(object): self.protocol.stream = self def accept(self, rfp, wfp): + """ + Attach a pair of file objects to :attr:`receive_side` and + :attr:`transmit_side`, after wrapping them in :class:`Side` instances. + :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec` + on the underlying file descriptors during construction. + + The same file object may be used for both sides. The default + :meth:`on_disconnect` is handles the possibility that only one + descriptor may need to be closed. + + :param file rfp: + The file object to receive from. + :param file wfp: + The file object to transmit to. + """ self.receive_side = Side(self, rfp) self.transmit_side = Side(self, wfp) @@ -1601,13 +1685,17 @@ class Stream(object): def on_receive(self, broker): """ - Called by :class:`Broker` when the stream's :attr:`receive_side` has + Invoked by :class:`Broker` when the stream's :attr:`receive_side` has been marked readable using :meth:`Broker.start_receive` and the broker has detected the associated file descriptor is ready for reading. - Subclasses must implement this if :meth:`Broker.start_receive` is ever - called on them, and the method must call :meth:`on_disconect` if - reading produces an empty string. + Subclasses must implement this if they are registered using + :meth:`Broker.start_receive`, and the method must invoke + :meth:`on_disconnect` if reading produces an empty string. + + The default implementation reads :attr:`Protocol.read_size` bytes and + passes the resulting bytestring to :meth:`Protocol.on_receive`. If the + bytestring is 0 bytes, invokes :meth:`on_disconnect` instead. """ buf = self.receive_side.read(self.protocol.read_size) if not buf: @@ -1618,30 +1706,39 @@ class Stream(object): def on_transmit(self, broker): """ - Called by :class:`Broker` when the stream's :attr:`transmit_side` - has been marked writeable using :meth:`Broker._start_transmit` and - the broker has detected the associated file descriptor is ready for + Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has + been marked writeable using :meth:`Broker._start_transmit` and the + broker has detected the associated file descriptor is ready for writing. - Subclasses must implement this if :meth:`Broker._start_transmit` is - ever called on them. + Subclasses must implement they are ever registerd with + :meth:`Broker._start_transmit`. + + The default implementation invokes :meth:`Protocol.on_transmit`. """ self.protocol.on_transmit(broker) def on_shutdown(self, broker): """ - Called by :meth:`Broker.shutdown` to allow the stream time to - gracefully shutdown. The base implementation simply called - :meth:`on_disconnect`. + Invoked by :meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. + + The default implementation emits a ``shutdown`` signal before + invoking :meth:`on_disconnect`. """ fire(self, 'shutdown') self.protocol.on_shutdown(broker) def on_disconnect(self, broker): """ - Called by :class:`Broker` to force disconnect the stream. The base - implementation simply closes :attr:`receive_side` and - :attr:`transmit_side` and unregisters the stream from the broker. + Invoked by :class:`Broker` to force disconnect the stream during + shutdown, invoked by the default :meth:`on_shutdown` implementation, + and usually invoked by any subclass :meth:`on_receive` implementation + in response to a 0-byte read. + + The base implementation fires a ``disconnect`` event, then closes + :attr:`receive_side` and :attr:`transmit_side` after unregistering the + stream from the broker. """ fire(self, 'disconnect') self.protocol.on_disconnect(broker) @@ -1666,6 +1763,8 @@ class Protocol(object): #: :data:`None`. stream = None + #: The size of the read buffer used by :class:`Stream` when this is the + #: active protocol for the stream. read_size = CHUNK_SIZE @classmethod @@ -2369,8 +2468,18 @@ class Latch(object): See :ref:`waking-sleeping-threads` for further discussion. """ + #: The :class:`Poller` implementation to use for waiting. Since the poller + #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller` + #: if it is available, or :class:`mitogen.core.Poller` otherwise, since + #: these implementations require no system calls to create, configure or + #: destroy. poller_class = Poller + #: If not :data:`None`, a function invoked as `notify(latch)` after a + #: successful call to :meth:`put`. The function is invoked on the + #: :meth:`put` caller's thread, which may be the :class:`Broker` thread, + #: therefore it must not block. Used by :class:`mitogen.select.Select` to + #: efficiently implement waiting on multiple event sources. notify = None # The _cls_ prefixes here are to make it crystal clear in the code which @@ -2725,15 +2834,22 @@ class Waker(Protocol): class IoLoggerProtocol(DelimitedProtocol): """ - Handle redirection of standard IO into the :mod:`logging` package. + Attached to one end of a socket pair whose other end overwrites one of the + standard ``stdout`` or ``stderr`` file descriptors in a child context. + Received data is split up into lines, decoded as UTF-8 and logged to the + :mod:`logging` package as either the ``stdout`` or ``stderr`` logger. + + Logging in child contexts is in turn forwarded to the master process using + :class:`LogHandler`. """ @classmethod def build_stream(cls, name, dest_fd): """ - Even though the descriptor `dest_fd` will hold the opposite end of the - socket open, we must keep a separate dup() of it (i.e. wsock) in case - some code decides to overwrite `dest_fd` later, which would thus break - :meth:`on_shutdown`. + Even though the file descriptor `dest_fd` will hold the opposite end of + the socket open, we must keep a separate dup() of it (i.e. wsock) in + case some code decides to overwrite `dest_fd` later, which would + prevent break :meth:`on_shutdown` from calling :meth:`shutdown() + ` on it. """ rsock, wsock = socket.socketpair() os.dup2(wsock.fileno(), dest_fd) @@ -3370,9 +3486,9 @@ class Broker(object): for _, (side, _) in self.poller.readers + self.poller.writers: self._call(side.stream, side.stream.on_shutdown) - deadline = time.time() + self.shutdown_timeout - while self.keep_alive() and time.time() < deadline: - self._loop_once(max(0, deadline - time.time())) + deadline = now() + self.shutdown_timeout + while self.keep_alive() and now() < deadline: + self._loop_once(max(0, deadline - now())) if self.keep_alive(): LOG.error('%r: pending work still existed %d seconds after ' diff --git a/mitogen/master.py b/mitogen/master.py index 814f7019..b4eb6643 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -741,7 +741,7 @@ class ModuleFinder(object): The list is determined by retrieving the source code of `fullname`, compiling it, and examining all IMPORT_NAME ops. - :param fullname: Fully qualified name of an _already imported_ module + :param fullname: Fully qualified name of an *already imported* module for which source code can be retrieved :type fullname: str """ @@ -789,7 +789,7 @@ class ModuleFinder(object): This method is like :py:meth:`find_related_imports`, but also recursively searches any modules which are imported by `fullname`. - :param fullname: Fully qualified name of an _already imported_ module + :param fullname: Fully qualified name of an *already imported* module for which source code can be retrieved :type fullname: str """ @@ -841,7 +841,7 @@ class ModuleResponder(object): def add_source_override(self, fullname, path, source, is_pkg): """ - See :meth:`ModuleFinder.add_source_override. + See :meth:`ModuleFinder.add_source_override`. """ self._finder.add_source_override(fullname, path, source, is_pkg) @@ -910,9 +910,9 @@ class ModuleResponder(object): if self.minify_safe_re.search(source): # If the module contains a magic marker, it's safe to minify. - t0 = time.time() + t0 = mitogen.core.now() source = mitogen.minify.minimize_source(source).encode('utf-8') - self.minify_secs += time.time() - t0 + self.minify_secs += mitogen.core.now() - t0 if is_pkg: pkg_present = get_child_modules(path) @@ -1001,11 +1001,11 @@ class ModuleResponder(object): LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - t0 = time.time() + t0 = mitogen.core.now() try: self._send_module_and_related(stream, fullname) finally: - self.get_module_secs += time.time() - t0 + self.get_module_secs += mitogen.core.now() - t0 def _send_forward_module(self, stream, context, fullname): if stream.protocol.remote_id != context.context_id: diff --git a/mitogen/parent.py b/mitogen/parent.py index 79b484c2..983df829 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -633,7 +633,7 @@ class TimerList(object): :meth:`expire`. The main user interface to :class:`TimerList` is :meth:`schedule`. """ - _now = time.time + _now = mitogen.core.now def __init__(self): self._lst = [] @@ -1124,7 +1124,7 @@ class LineLoggingProtocolMixin(object): def on_line_received(self, line): self.logged_partial = None - self.logged_lines.append((time.time(), line)) + self.logged_lines.append((mitogen.core.now(), line)) self.logged_lines[:] = self.logged_lines[-100:] return super(LineLoggingProtocolMixin, self).on_line_received(line) @@ -1134,7 +1134,7 @@ class LineLoggingProtocolMixin(object): def on_disconnect(self, broker): if self.logged_partial: - self.logged_lines.append((time.time(), self.logged_partial)) + self.logged_lines.append((mitogen.core.now(), self.logged_partial)) self.logged_partial = None super(LineLoggingProtocolMixin, self).on_disconnect(broker) @@ -1324,7 +1324,7 @@ class Options(object): self.profiling = profiling self.unidirectional = unidirectional self.max_message_size = max_message_size - self.connect_deadline = time.time() + self.connect_timeout + self.connect_deadline = mitogen.core.now() + self.connect_timeout class Connection(object): @@ -1819,7 +1819,7 @@ class CallChain(object): socket.gethostname(), os.getpid(), thread.get_ident(), - int(1e6 * time.time()), + int(1e6 * mitogen.core.now()), ) def __repr__(self): @@ -2331,7 +2331,7 @@ class Router(mitogen.core.Router): directly connected. """ stream = self.stream_by_id(context) - if stream.protocol.remote_id != context.context_id: + if stream is None or stream.protocol.remote_id != context.context_id: return l = mitogen.core.Latch() @@ -2516,7 +2516,7 @@ class Reaper(object): :param mitogen.core.Broker broker: The :class:`Broker` on which to install timers - :param Process proc: + :param mitogen.parent.Process proc: The process to reap. :param bool kill: If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process. @@ -2569,7 +2569,7 @@ class Reaper(object): def _install_timer(self, delay): new = self._timer is None self._timer = self.broker.timers.schedule( - when=time.time() + delay, + when=mitogen.core.now() + delay, func=self.reap, ) if new: @@ -2589,6 +2589,7 @@ class Reaper(object): status = self.proc.poll() if status is not None: LOG.debug('%r: %s', self.proc, returncode_to_str(status)) + mitogen.core.fire(self.proc, 'exit') self._remove_timer() return diff --git a/mitogen/service.py b/mitogen/service.py index 6654fb32..8882b30b 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -1109,7 +1109,7 @@ class FileService(Service): :meth:`fetch`. """ LOG.debug('get_file(): fetching %r from %r', path, context) - t0 = time.time() + t0 = mitogen.core.now() recv = mitogen.core.Receiver(router=context.router) metadata = context.call_service( service_name=cls.name(), @@ -1143,5 +1143,6 @@ class FileService(Service): path, metadata['size'], received_bytes) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - metadata['size'], path, context, 1000 * (time.time() - t0)) + metadata['size'], path, context, + 1000 * (mitogen.core.now() - t0)) return ok, metadata diff --git a/preamble_size.py b/preamble_size.py index 692ad7b1..f0d1e804 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -24,10 +24,12 @@ conn = mitogen.ssh.Connection(options, router) conn.context = context print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),)) -print('Preamble size: %s (%.2fKiB)' % ( +print('Bootstrap (mitogen.core) size: %s (%.2fKiB)' % ( len(conn.get_preamble()), len(conn.get_preamble()) / 1024.0, )) +print('') + if '--dump' in sys.argv: print(zlib.decompress(conn.get_preamble())) exit() @@ -57,7 +59,7 @@ for mod in ( original_size = len(original) minimized = mitogen.minify.minimize_source(original) minimized_size = len(minimized) - compressed = zlib.compress(minimized, 9) + compressed = zlib.compress(minimized.encode(), 9) compressed_size = len(compressed) print( '%-25s' diff --git a/tests/bench/fork.py b/tests/bench/fork.py index b2f2382c..af5cb3a7 100644 --- a/tests/bench/fork.py +++ b/tests/bench/fork.py @@ -3,13 +3,13 @@ Measure latency of .fork() setup/teardown. """ import mitogen -import time +import mitogen.core @mitogen.main() def main(router): - t0 = time.time() + t0 = mitogen.core.now() for x in xrange(200): - t = time.time() + t = mitogen.core.now() ctx = router.fork() ctx.shutdown(wait=True) - print '++', 1000 * ((time.time() - t0) / (1.0+x)) + print '++', 1000 * ((mitogen.core.now() - t0) / (1.0+x)) diff --git a/tests/bench/large_messages.py b/tests/bench/large_messages.py index 24220023..e977e36d 100644 --- a/tests/bench/large_messages.py +++ b/tests/bench/large_messages.py @@ -4,7 +4,9 @@ import subprocess import time import socket + import mitogen +import mitogen.core @mitogen.main() @@ -15,12 +17,12 @@ def main(router): s = ' ' * n print('bytes in %.2fMiB string...' % (n/1048576.0),) - t0 = time.time() + t0 = mitogen.core.now() for x in range(10): - tt0 = time.time() + tt0 = mitogen.core.now() assert n == c.call(len, s) - print('took %dms' % (1000 * (time.time() - tt0),)) - t1 = time.time() + print('took %dms' % (1000 * (mitogen.core.now() - tt0),)) + t1 = mitogen.core.now() print('total %dms / %dms avg / %.2fMiB/sec' % ( 1000 * (t1 - t0), (1000 * (t1 - t0)) / (x + 1), diff --git a/tests/bench/latch_roundtrip.py b/tests/bench/latch_roundtrip.py index 49314fb9..1198aa48 100644 --- a/tests/bench/latch_roundtrip.py +++ b/tests/bench/latch_roundtrip.py @@ -6,6 +6,7 @@ import threading import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -33,8 +34,8 @@ t2.start() ready.get() ready.get() -t0 = time.time() +t0 = mitogen.core.now() l1.put(None) t1.join() t2.join() -print('++', int(1e6 * ((time.time() - t0) / (1.0+X))), 'usec') +print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+X))), 'usec') diff --git a/tests/bench/local.py b/tests/bench/local.py index 2808d803..aefeb84d 100644 --- a/tests/bench/local.py +++ b/tests/bench/local.py @@ -5,6 +5,7 @@ Measure latency of .local() setup. import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -15,10 +16,10 @@ mitogen.utils.setup_gil() @mitogen.main() def main(router): - t0=time.time() + t0 = mitogen.core.now() for x in range(100): - t = time.time() + t = mitogen.core.now() f = router.local()# debug=True) - tt = time.time() + tt = mitogen.core.now() print(x, 1000 * (tt - t)) - print('%.03f ms' % (1000 * (time.time() - t0) / (1.0 + x))) + print('%.03f ms' % (1000 * (mitogen.core.now() - t0) / (1.0 + x))) diff --git a/tests/bench/megatime.py b/tests/bench/megatime.py index 6f5f3b5d..40cd9986 100755 --- a/tests/bench/megatime.py +++ b/tests/bench/megatime.py @@ -4,12 +4,14 @@ import sys import os import time +import mitogen.core + times = [] for x in range(5): - t0 = time.time() + t0 = mitogen.core.now() os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:]) - t = time.time() - t0 + t = mitogen.core.now() - t0 times.append(t) print('+++', t) diff --git a/tests/bench/roundtrip.py b/tests/bench/roundtrip.py index 8d86d75b..8f31b1a2 100644 --- a/tests/bench/roundtrip.py +++ b/tests/bench/roundtrip.py @@ -5,6 +5,7 @@ Measure latency of local RPC. import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -23,7 +24,7 @@ def do_nothing(): def main(router): f = router.fork() f.call(do_nothing) - t0 = time.time() + t0 = mitogen.core.now() for x in xrange(20000): f.call(do_nothing) - print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') + print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec') diff --git a/tests/bench/service.py b/tests/bench/service.py index 6d866b5c..267ae3f6 100644 --- a/tests/bench/service.py +++ b/tests/bench/service.py @@ -4,8 +4,9 @@ Measure latency of local service RPC. import time -import mitogen.service import mitogen +import mitogen.core +import mitogen.service class MyService(mitogen.service.Service): @@ -17,7 +18,7 @@ class MyService(mitogen.service.Service): @mitogen.main() def main(router): f = router.fork() - t0 = time.time() + t0 = mitogen.core.now() for x in range(1000): f.call_service(service_name=MyService, method_name='ping') - print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') + print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec') diff --git a/tests/bench/ssh-roundtrip.py b/tests/bench/ssh-roundtrip.py index 8745505d..06c596c0 100644 --- a/tests/bench/ssh-roundtrip.py +++ b/tests/bench/ssh-roundtrip.py @@ -6,6 +6,7 @@ import sys import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -24,12 +25,12 @@ def do_nothing(): def main(router): f = router.ssh(hostname=sys.argv[1]) f.call(do_nothing) - t0 = time.time() - end = time.time() + 5.0 + t0 = mitogen.core.now() + end = mitogen.core.now() + 5.0 i = 0 - while time.time() < end: + while mitogen.core.now() < end: f.call(do_nothing) i += 1 - t1 = time.time() + t1 = mitogen.core.now() print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms') diff --git a/tests/bench/throughput.py b/tests/bench/throughput.py index 42604826..acb51afa 100644 --- a/tests/bench/throughput.py +++ b/tests/bench/throughput.py @@ -8,6 +8,7 @@ import tempfile import time import mitogen +import mitogen.core import mitogen.service import ansible_mitogen.affinity @@ -35,9 +36,9 @@ def run_test(router, fp, s, context): size = fp.tell() print('Testing %s...' % (s,)) context.call(prepare) - t0 = time.time() + t0 = mitogen.core.now() context.call(transfer, router.myself(), fp.name) - t1 = time.time() + t1 = mitogen.core.now() print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % ( s, 1000 * (t1 - t0), size / 1048576.0, (size / (t1 - t0) / 1048576.0), diff --git a/tests/create_child_test.py b/tests/create_child_test.py index 1c2f526a..26f10d57 100644 --- a/tests/create_child_test.py +++ b/tests/create_child_test.py @@ -9,6 +9,7 @@ import tempfile import mock import unittest2 +import mitogen.core import mitogen.parent from mitogen.core import b @@ -188,7 +189,6 @@ class TtyCreateChildTest(testlib.TestCase): proc = self.func([ 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) ]) - deadline = time.time() + 5.0 mitogen.core.set_block(proc.stdin.fileno()) # read(3) below due to https://bugs.python.org/issue37696 self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3)) @@ -271,7 +271,6 @@ class TtyCreateChildTest(testlib.TestCase): proc = self.func([ 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) ]) - deadline = time.time() + 5.0 self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3)) waited_pid, status = os.waitpid(proc.pid, 0) self.assertEquals(proc.pid, waited_pid) diff --git a/tests/parent_test.py b/tests/parent_test.py index b314d472..d6efe998 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -12,6 +12,7 @@ import unittest2 import testlib from testlib import Popen__terminate +import mitogen.core import mitogen.parent try: @@ -21,8 +22,8 @@ except NameError: def wait_for_child(pid, timeout=1.0): - deadline = time.time() + timeout - while timeout < time.time(): + deadline = mitogen.core.now() + timeout + while timeout < mitogen.core.now(): try: target_pid, status = os.waitpid(pid, os.WNOHANG) if target_pid == pid: diff --git a/tests/poller_test.py b/tests/poller_test.py index b05a9b94..3ed59ae3 100644 --- a/tests/poller_test.py +++ b/tests/poller_test.py @@ -164,14 +164,14 @@ class CloseMixin(PollerMixin): class PollMixin(PollerMixin): def test_empty_zero_timeout(self): - t0 = time.time() + t0 = mitogen.core.now() self.assertEquals([], list(self.p.poll(0))) - self.assertTrue((time.time() - t0) < .1) # vaguely reasonable + self.assertTrue((mitogen.core.now() - t0) < .1) # vaguely reasonable def test_empty_small_timeout(self): - t0 = time.time() + t0 = mitogen.core.now() self.assertEquals([], list(self.p.poll(.2))) - self.assertTrue((time.time() - t0) >= .2) + self.assertTrue((mitogen.core.now() - t0) >= .2) class ReadableMixin(PollerMixin, SockMixin): diff --git a/tests/router_test.py b/tests/router_test.py index dba56c9b..a9ad5ae1 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -1,3 +1,5 @@ +import errno +import os import sys import time import zlib @@ -425,5 +427,108 @@ class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): ])) +class ShutdownTest(testlib.RouterMixin, testlib.TestCase): + # 613: tests for all the weird shutdown() variants we ended up with. + + def test_shutdown_wait_false(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + conn = self.router.stream_by_id(l1.context_id).conn + exitted = mitogen.core.Latch() + mitogen.core.listen(conn.proc, 'exit', exitted.put) + + l1.shutdown(wait=False) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_shutdown_wait_true(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + conn = self.router.stream_by_id(l1.context_id).conn + exitted = mitogen.core.Latch() + mitogen.core.listen(conn.proc, 'exit', exitted.put) + + l1.shutdown(wait=True) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_disconnect_invalid_context(self): + self.router.disconnect( + mitogen.core.Context(self.router, 1234) + ) + + def test_disconnect_valid_context(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + strm = self.router.stream_by_id(l1.context_id) + + exitted = mitogen.core.Latch() + mitogen.core.listen(strm.conn.proc, 'exit', exitted.put) + self.router.disconnect_stream(strm) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_disconnet_all(self): + l1 = self.router.local() + l2 = self.router.local() + + pids = [l1.call(os.getpid), l2.call(os.getpid)] + + exitted = mitogen.core.Latch() + for ctx in l1, l2: + strm = self.router.stream_by_id(ctx.context_id) + mitogen.core.listen(strm.conn.proc, 'exit', exitted.put) + + self.router.disconnect_all() + exitted.get() + exitted.get() + + for pid in pids: + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + for ctx in l1, l2: + e = self.assertRaises(mitogen.core.ChannelError, + lambda: ctx.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + ctx.context_id, + mitogen.context_id, + )) + + if __name__ == '__main__': unittest2.main() diff --git a/tests/testlib.py b/tests/testlib.py index 255fba88..73d3438d 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -107,11 +107,11 @@ def wait_for_port( If a regex pattern is supplied try to find it in the initial data. Return None on success, or raise on error. """ - start = time.time() + start = mitogen.core.now() end = start + overall_timeout addr = (host, port) - while time.time() < end: + while mitogen.core.now() < end: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(connect_timeout) try: @@ -130,7 +130,7 @@ def wait_for_port( sock.settimeout(receive_timeout) data = mitogen.core.b('') found = False - while time.time() < end: + while mitogen.core.now() < end: try: resp = sock.recv(1024) except socket.timeout: diff --git a/tests/timer_test.py b/tests/timer_test.py index ff3e022f..749405a4 100644 --- a/tests/timer_test.py +++ b/tests/timer_test.py @@ -162,7 +162,7 @@ def do_timer_test_econtext(econtext): def do_timer_test(broker): - now = time.time() + now = mitogen.core.now() latch = mitogen.core.Latch() broker.defer(lambda: broker.timers.schedule( @@ -172,7 +172,7 @@ def do_timer_test(broker): ) assert 'hi' == latch.get() - assert time.time() > (now + 0.250) + assert mitogen.core.now() > (now + 0.250) class BrokerTimerTest(testlib.TestCase): diff --git a/tests/unix_test.py b/tests/unix_test.py index ba1ba152..cf3e595f 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -86,12 +86,12 @@ class ClientTest(testlib.TestCase): def _try_connect(self, path): # give server a chance to setup listener - timeout = time.time() + 30.0 + timeout = mitogen.core.now() + 30.0 while True: try: return mitogen.unix.connect(path) except mitogen.unix.ConnectError: - if time.time() > timeout: + if mitogen.core.now() > timeout: raise time.sleep(0.1)