From 1d943388b7e19e150a5d794ff58b0e5e150f044b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 11:04:49 +0100 Subject: [PATCH 1/9] docs: tidy up some Changelog text --- docs/changelog.rst | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 6155eb43..9f3e3546 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -107,20 +107,18 @@ Mitogen for Ansible server has been increased from `15*3` seconds to `30*10` seconds. * `#600 `_: functionality to reflect - changes to ``/etc/environment`` in the running interpreter did not account - for Unicode file contents. Now the file may contain data in any single byte - encoding. + changes to ``/etc/environment`` did not account for Unicode file contents. + The file may now use any single byte encoding. * `#602 `_: connection configuration is more accurately inferred for `meta: reset_connection`, the `synchronize` - module, and for any other action plug-ins that establish new connections of - their own. + module, and for any action plug-ins that establish additional connections. * `#615 `_: streaming file transfer - is implemented for the ``fetch`` and any other action that transfers files - from the target to the controller. Previously the file would be sent as a - single message, requiring the file to fit in RAM and be smaller than internal - limits on the size of a single message. + is implemented for ``fetch`` and other actions that transfer files from the + target to the controller. Previously the file was sent in one message, + requiring it to fit in RAM and be smaller than the internal message size + limit. * `7ae926b3 `_: the ``lineinfile`` module began leaking writable temporary file descriptors since @@ -188,21 +186,20 @@ Core Library :meth:`empty` method of :class:`mitogen.core.Latch`, :class:`mitogen.core.Receiver` and :class:`mitogen.select.Select` has been replaced by a more general :meth:`size` method. :meth:`empty` will be removed - in Mitogen 0.3 + in 0.3 * `ecc570cb `_: previously - :meth:`mitogen.select.Select.add` would enqueue a single wake event when - adding an existing receiver, latch or subselect that contained multiple - buffered items, causing future :meth:`get` calls to block or fail even though - data existed that could be returned. - -* `5924af15 `_: *[security]* the - unidirectional routing mode, in which contexts may only communicate with - parents and never siblings (so a program cannot accidentally bridge - air-gapped networks) was not inherited when a child context was initiated - directly from an existing child. This did not effect the Ansible extension, - since the controller initiates any new context used for routing, only forked - tasks are initiated by children. + :meth:`mitogen.select.Select.add` would enqueue one wake event when adding an + existing receiver, latch or subselect that contained multiple buffered items, + causing :meth:`get` calls to block or fail even though data existed to return. + +* `5924af15 `_: *[security]* + unidirectional routing, where contexts may optionally only communicate with + parents and never siblings (so that air-gapped networks cannot be + unintentionally bridged) was not inherited when a child was initiated + directly from an another child. This did not effect Ansible, since the + controller initiates any new child used for routing, only forked tasks are + initiated by children. Thanks! From 93e8d5dfcc44641a337c947f5890ea2977399ca4 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 14:52:24 +0100 Subject: [PATCH 2/9] docs: fix Sphinx warnings, add LogHandler, more docstrings --- docs/api.rst | 5 -- docs/changelog.rst | 10 +-- docs/internals.rst | 15 +++-- mitogen/core.py | 150 ++++++++++++++++++++++++++++++++++++--------- mitogen/master.py | 6 +- mitogen/parent.py | 2 +- preamble_size.py | 4 +- 7 files changed, 143 insertions(+), 49 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 2f1f9784..51895318 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -2,11 +2,6 @@ API Reference ************* -.. toctree:: - :hidden: - - signals - Package Layout ============== diff --git a/docs/changelog.rst b/docs/changelog.rst index 9f3e3546..eb889daa 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -22,7 +22,7 @@ To avail of fixes in an unreleased version, please download a ZIP file `directly from GitHub `_. Enhancements -^^^^^^^^^^^^ +~~~~~~~~~~~~ * `#556 `_, `#587 `_: Ansible 2.8 is partially @@ -61,7 +61,7 @@ Enhancements Mitogen for Ansible -^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~ * `#363 `_: fix an obscure race matching *Permission denied* errors from some versions of ``su`` running on @@ -488,7 +488,7 @@ Enhancements `#491 `_, `#493 `_: the interface employed for in-process queues changed from `kqueue - `_ / `epoll + `_ / `epoll `_ to `poll() `_, which requires no setup or teardown, yielding a 38% latency reduction for inter-thread communication. @@ -1034,7 +1034,7 @@ bug reports, testing, features and fixes in this release contributed by `Josh Smift `_, `Luca Nunzi `_, `Orion Poplawski `_, -`Peter V. Saveliev `_, +`Peter V. Saveliev `_, `Pierre-Henry Muller `_, `Pierre-Louis Bonicoli `_, `Prateek Jain `_, @@ -1092,7 +1092,7 @@ Core Library * `#300 `_: the broker could crash on OS X during shutdown due to scheduled `kqueue - `_ filter changes for + `_ filter changes for descriptors that were closed before the IO loop resumes. As a temporary workaround, kqueue's bulk change feature is not used. diff --git a/docs/internals.rst b/docs/internals.rst index 1df1c2ad..d062b6d9 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -7,11 +7,6 @@ Internal API Reference Internal APIs are subject to rapid change even across minor releases. This page exists to help users modify and extend the library. -.. toctree:: - :hidden: - - signals - Constants ========= @@ -50,6 +45,10 @@ Logging See also :class:`mitogen.core.IoLoggerProtocol`. +.. currentmodule:: mitogen.core +.. autoclass:: LogHandler + :members: + .. currentmodule:: mitogen.master .. autoclass:: LogForwarder :members: @@ -270,6 +269,8 @@ Helpers .. autofunction:: minimize_source +.. _signals: + Signals ======= @@ -312,6 +313,10 @@ These signals are used internally by Mitogen. - ``disconnect`` - Fired on the Broker thread when disconnection is detected. + * - :py:class:`mitogen.core.Stream` + - ``shutdown`` + - Fired on the Broker thread when broker shutdown begins. + * - :py:class:`mitogen.core.Context` - ``disconnect`` - Fired on the Broker thread during shutdown (???) diff --git a/mitogen/core.py b/mitogen/core.py index 8b4f135e..aebd337e 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1024,11 +1024,11 @@ class Receiver(object): routed to the context due to disconnection, and ignores messages that did not originate from the respondent context. """ - #: If not :data:`None`, a reference to a function invoked as - #: `notify(receiver)` when a new message is delivered to this receiver. The - #: function is invoked on the broker thread, therefore it must not block. - #: Used by :class:`mitogen.select.Select` to implement waiting on multiple - #: receivers. + #: If not :data:`None`, a function invoked as `notify(receiver)` after a + #: message has been received. The function is invoked on :class:`Broker` + #: thread, therefore it must not block. Used by + #: :class:`mitogen.select.Select` to efficiently implement waiting on + #: multiple event sources. notify = None raise_channelerror = True @@ -1513,6 +1513,22 @@ class Importer(object): class LogHandler(logging.Handler): + """ + A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG` + messages to be sent to a parent context in response to logging messages + generated by the current context. This is installed by default in child + contexts during bootstrap, so that :mod:`logging` events can be viewed and + managed centrally in the master process. + + The handler is initially *corked* after construction, such that it buffers + messages until :meth:`uncork` is called. This allows logging to be + installed prior to communication with the target being available, and + avoids any possible race where early log messages might be dropped. + + :param mitogen.core.Context context: + The context to send log messages towards. At present this is always + the master process. + """ def __init__(self, context): logging.Handler.__init__(self) self.context = context @@ -1549,6 +1565,9 @@ class LogHandler(logging.Handler): self._buffer_lock.release() def emit(self, rec): + """ + Send a :data:`FORWARD_LOG` message towards the target context. + """ if rec.name == 'mitogen.io' or \ getattr(self.local, 'in_emit', False): return @@ -1566,6 +1585,30 @@ class LogHandler(logging.Handler): class Stream(object): + """ + A :class:`Stream` is one readable and optionally one writeable file + descriptor (represented by :class:`Side`) aggregated alongside an + associated :class:`Protocol` that knows how to respond to IO readiness + events for those descriptors. + + Streams are registered with :class:`Broker`, and callbacks are invoked on + the broker thread in response to IO activity. When registered using + :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker + may call any of :meth:`on_receive`, :meth:`on_transmit`, + :meth:`on_shutdown` or :meth:`on_disconnect`. + + It is expected that the :class:`Protocol` associated with a stream will + change over its life. For example during connection setup, the initial + protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to + enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to + the target, before handing off to :class:`MitogenProtocol` when the target + process is initialized. + + Streams connecting to children are in turn aggregated by + :class:`mitogen.parent.Connection`, which contains additional logic for + managing any child process, and a reference to any separate ``stderr`` + :class:`Stream` connected to that process. + """ #: A :class:`Side` representing the stream's receive file descriptor. receive_side = None @@ -1578,14 +1621,16 @@ class Stream(object): #: In parents, the :class:`mitogen.parent.Connection` instance. conn = None + #: The stream name. This is used in the :meth:`__repr__` output in any log + #: messages, it may be any descriptive string. name = u'default' def set_protocol(self, protocol): """ - Bind a protocol to this stream, by updating :attr:`Protocol.stream` to - refer to this stream, and updating this stream's - :attr:`Stream.protocol` to the refer to the protocol. Any prior - protocol's :attr:`Protocol.stream` is set to :data:`None`. + Bind a :class:`Protocol` to this stream, by updating + :attr:`Protocol.stream` to refer to this stream, and updating this + stream's :attr:`Stream.protocol` to the refer to the protocol. Any + prior protocol's :attr:`Protocol.stream` is set to :data:`None`. """ if self.protocol: self.protocol.stream = None @@ -1593,6 +1638,21 @@ class Stream(object): self.protocol.stream = self def accept(self, rfp, wfp): + """ + Attach a pair of file objects to :attr:`receive_side` and + :attr:`transmit_side`, after wrapping them in :class:`Side` instances. + :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec` + on the underlying file descriptors during construction. + + The same file object may be used for both sides. The default + :meth:`on_disconnect` is handles the possibility that only one + descriptor may need to be closed. + + :param file rfp: + The file object to receive from. + :param file wfp: + The file object to transmit to. + """ self.receive_side = Side(self, rfp) self.transmit_side = Side(self, wfp) @@ -1601,13 +1661,17 @@ class Stream(object): def on_receive(self, broker): """ - Called by :class:`Broker` when the stream's :attr:`receive_side` has + Invoked by :class:`Broker` when the stream's :attr:`receive_side` has been marked readable using :meth:`Broker.start_receive` and the broker has detected the associated file descriptor is ready for reading. - Subclasses must implement this if :meth:`Broker.start_receive` is ever - called on them, and the method must call :meth:`on_disconect` if - reading produces an empty string. + Subclasses must implement this if they are registered using + :meth:`Broker.start_receive`, and the method must invoke + :meth:`on_disconnect` if reading produces an empty string. + + The default implementation reads :attr:`Protocol.read_size` bytes and + passes the resulting bytestring to :meth:`Protocol.on_receive`. If the + bytestring is 0 bytes, invokes :meth:`on_disconnect` instead. """ buf = self.receive_side.read(self.protocol.read_size) if not buf: @@ -1618,30 +1682,39 @@ class Stream(object): def on_transmit(self, broker): """ - Called by :class:`Broker` when the stream's :attr:`transmit_side` - has been marked writeable using :meth:`Broker._start_transmit` and - the broker has detected the associated file descriptor is ready for + Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has + been marked writeable using :meth:`Broker._start_transmit` and the + broker has detected the associated file descriptor is ready for writing. - Subclasses must implement this if :meth:`Broker._start_transmit` is - ever called on them. + Subclasses must implement they are ever registerd with + :meth:`Broker._start_transmit`. + + The default implementation invokes :meth:`Protocol.on_transmit`. """ self.protocol.on_transmit(broker) def on_shutdown(self, broker): """ - Called by :meth:`Broker.shutdown` to allow the stream time to - gracefully shutdown. The base implementation simply called - :meth:`on_disconnect`. + Invoked by :meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. + + The default implementation emits a ``shutdown`` signal before + invoking :meth:`on_disconnect`. """ fire(self, 'shutdown') self.protocol.on_shutdown(broker) def on_disconnect(self, broker): """ - Called by :class:`Broker` to force disconnect the stream. The base - implementation simply closes :attr:`receive_side` and - :attr:`transmit_side` and unregisters the stream from the broker. + Invoked by :class:`Broker` to force disconnect the stream during + shutdown, invoked by the default :meth:`on_shutdown` implementation, + and usually invoked by any subclass :meth:`on_receive` implementation + in response to a 0-byte read. + + The base implementation fires a ``disconnect`` event, then closes + :attr:`receive_side` and :attr:`transmit_side` after unregistering the + stream from the broker. """ fire(self, 'disconnect') self.protocol.on_disconnect(broker) @@ -1666,6 +1739,8 @@ class Protocol(object): #: :data:`None`. stream = None + #: The size of the read buffer used by :class:`Stream` when this is the + #: active protocol for the stream. read_size = CHUNK_SIZE @classmethod @@ -2369,8 +2444,18 @@ class Latch(object): See :ref:`waking-sleeping-threads` for further discussion. """ + #: The :class:`Poller` implementation to use for waiting. Since the poller + #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller` + #: if it is available, or :class:`mitogen.core.Poller` otherwise, since + #: these implementations require no system calls to create, configure or + #: destroy. poller_class = Poller + #: If not :data:`None`, a function invoked as `notify(latch)` after a + #: successful call to :meth:`put`. The function is invoked on the + #: :meth:`put` caller's thread, which may be the :class:`Broker` thread, + #: therefore it must not block. Used by :class:`mitogen.select.Select` to + #: efficiently implement waiting on multiple event sources. notify = None # The _cls_ prefixes here are to make it crystal clear in the code which @@ -2725,15 +2810,22 @@ class Waker(Protocol): class IoLoggerProtocol(DelimitedProtocol): """ - Handle redirection of standard IO into the :mod:`logging` package. + Attached to one end of a socket pair whose other end overwrites one of the + standard ``stdout`` or ``stderr`` file descriptors in a child context. + Received data is split up into lines, decoded as UTF-8 and logged to the + :mod:`logging` package as either the ``stdout`` or ``stderr`` logger. + + Logging in child contexts is in turn forwarded to the master process using + :class:`LogHandler`. """ @classmethod def build_stream(cls, name, dest_fd): """ - Even though the descriptor `dest_fd` will hold the opposite end of the - socket open, we must keep a separate dup() of it (i.e. wsock) in case - some code decides to overwrite `dest_fd` later, which would thus break - :meth:`on_shutdown`. + Even though the file descriptor `dest_fd` will hold the opposite end of + the socket open, we must keep a separate dup() of it (i.e. wsock) in + case some code decides to overwrite `dest_fd` later, which would + prevent break :meth:`on_shutdown` from calling :meth:`shutdown() + ` on it. """ rsock, wsock = socket.socketpair() os.dup2(wsock.fileno(), dest_fd) diff --git a/mitogen/master.py b/mitogen/master.py index 814f7019..09da775e 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -741,7 +741,7 @@ class ModuleFinder(object): The list is determined by retrieving the source code of `fullname`, compiling it, and examining all IMPORT_NAME ops. - :param fullname: Fully qualified name of an _already imported_ module + :param fullname: Fully qualified name of an *already imported* module for which source code can be retrieved :type fullname: str """ @@ -789,7 +789,7 @@ class ModuleFinder(object): This method is like :py:meth:`find_related_imports`, but also recursively searches any modules which are imported by `fullname`. - :param fullname: Fully qualified name of an _already imported_ module + :param fullname: Fully qualified name of an *already imported* module for which source code can be retrieved :type fullname: str """ @@ -841,7 +841,7 @@ class ModuleResponder(object): def add_source_override(self, fullname, path, source, is_pkg): """ - See :meth:`ModuleFinder.add_source_override. + See :meth:`ModuleFinder.add_source_override`. """ self._finder.add_source_override(fullname, path, source, is_pkg) diff --git a/mitogen/parent.py b/mitogen/parent.py index 79b484c2..ec218913 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -2516,7 +2516,7 @@ class Reaper(object): :param mitogen.core.Broker broker: The :class:`Broker` on which to install timers - :param Process proc: + :param mitogen.parent.Process proc: The process to reap. :param bool kill: If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process. diff --git a/preamble_size.py b/preamble_size.py index 692ad7b1..43c10029 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -24,10 +24,12 @@ conn = mitogen.ssh.Connection(options, router) conn.context = context print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),)) -print('Preamble size: %s (%.2fKiB)' % ( +print('Bootstrap (mitogen.core) size: %s (%.2fKiB)' % ( len(conn.get_preamble()), len(conn.get_preamble()) / 1024.0, )) +print('') + if '--dump' in sys.argv: print(zlib.decompress(conn.get_preamble())) exit() From a91a8bf19c203f884ca0f8ca134789f8a7b21c0b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 17:20:30 +0100 Subject: [PATCH 3/9] docs: upgrade Sphinx to 2.1.2, require Python 3 to build docs. --- docs/requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/requirements.txt b/docs/requirements.txt index a93c2140..85f30a2e 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,3 @@ -Sphinx==1.7.1 -sphinxcontrib-programoutput==0.11 +Sphinx==2.1.2 +sphinxcontrib-programoutput==0.14 alabaster==0.7.10 From 284dda53e8fe565074e1ed6523012ae596d3c044 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 17:20:46 +0100 Subject: [PATCH 4/9] preamble_size: make it work on Python 3. --- preamble_size.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/preamble_size.py b/preamble_size.py index 43c10029..f0d1e804 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -59,7 +59,7 @@ for mod in ( original_size = len(original) minimized = mitogen.minify.minimize_source(original) minimized_size = len(minimized) - compressed = zlib.compress(minimized, 9) + compressed = zlib.compress(minimized.encode(), 9) compressed_size = len(compressed) print( '%-25s' From 379dca90b957c84758b890703d19395d2e96ccf1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 17:21:11 +0100 Subject: [PATCH 5/9] docs: move decorator docs into core.py and use autodecorator --- docs/api.rst | 23 ++--------------------- docs/conf.py | 2 +- mitogen/core.py | 20 ++++++++++++++++++++ 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 51895318..273e31e3 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -26,29 +26,10 @@ mitogen.core .. automodule:: mitogen.core .. currentmodule:: mitogen.core -.. decorator:: takes_econtext - - Decorator that marks a function or class method to automatically receive a - kwarg named `econtext`, referencing the - :class:`mitogen.core.ExternalContext` active in the context in which the - function is being invoked in. The decorator is only meaningful when the - function is invoked via :data:`CALL_FUNCTION - `. - - When the function is invoked directly, `econtext` must still be passed to - it explicitly. +.. autodecorator:: takes_econtext .. currentmodule:: mitogen.core -.. decorator:: takes_router - - Decorator that marks a function or class method to automatically receive a - kwarg named `router`, referencing the :class:`mitogen.core.Router` - active in the context in which the function is being invoked in. The - decorator is only meaningful when the function is invoked via - :data:`CALL_FUNCTION `. - - When the function is invoked directly, `router` must still be passed to it - explicitly. +.. autodecorator:: takes_router mitogen.master diff --git a/docs/conf.py b/docs/conf.py index 2ee63aa8..aa91c8b8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,7 +7,7 @@ VERSION = '%s.%s.%s' % mitogen.__version__ author = u'Network Genomics' copyright = u'2019, Network Genomics' -exclude_patterns = ['_build'] +exclude_patterns = ['_build', '.venv'] extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinxcontrib.programoutput'] html_show_copyright = False html_show_sourcelink = False diff --git a/mitogen/core.py b/mitogen/core.py index aebd337e..a8de98e2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -448,11 +448,31 @@ def fire(obj, name, *args, **kwargs): def takes_econtext(func): + """ + Decorator that marks a function or class method to automatically receive a + kwarg named `econtext`, referencing the + :class:`mitogen.core.ExternalContext` active in the context in which the + function is being invoked in. The decorator is only meaningful when the + function is invoked via :data:`CALL_FUNCTION `. + + When the function is invoked directly, `econtext` must still be passed to + it explicitly. + """ func.mitogen_takes_econtext = True return func def takes_router(func): + """ + Decorator that marks a function or class method to automatically receive a + kwarg named `router`, referencing the :class:`mitogen.core.Router` active + in the context in which the function is being invoked in. The decorator is + only meaningful when the function is invoked via :data:`CALL_FUNCTION + `. + + When the function is invoked directly, `router` must still be passed to it + explicitly. + """ func.mitogen_takes_router = True return func From 57012e0f72fb27176fc21deb382779a5e21303fe Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 17:36:10 +0100 Subject: [PATCH 6/9] Add mitogen.core.now() and use it everywhere; closes #614. --- docs/api.rst | 8 ++++++++ mitogen/core.py | 12 ++++++++---- mitogen/master.py | 8 ++++---- mitogen/parent.py | 12 ++++++------ mitogen/service.py | 5 +++-- tests/bench/fork.py | 8 ++++---- tests/bench/large_messages.py | 10 ++++++---- tests/bench/latch_roundtrip.py | 5 +++-- tests/bench/local.py | 9 +++++---- tests/bench/megatime.py | 6 ++++-- tests/bench/roundtrip.py | 5 +++-- tests/bench/service.py | 7 ++++--- tests/bench/ssh-roundtrip.py | 9 +++++---- tests/bench/throughput.py | 5 +++-- tests/create_child_test.py | 3 +-- tests/parent_test.py | 5 +++-- tests/poller_test.py | 8 ++++---- tests/testlib.py | 6 +++--- tests/timer_test.py | 4 ++-- tests/unix_test.py | 4 ++-- 20 files changed, 81 insertions(+), 58 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 273e31e3..09aa8582 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -603,6 +603,14 @@ Fork Safety Utility Functions ================= +.. currentmodule:: mitogen.core +.. function:: now + + A reference to :func:`time.time` on Python 2, or :func:`time.monotonic` on + Python >3.3. We prefer :func:`time.monotonic` when available to ensure + timers are not impacted by system clock changes. + + .. module:: mitogen.utils A random assortment of utility functions useful on masters and children. diff --git a/mitogen/core.py b/mitogen/core.py index a8de98e2..d6e1739e 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -362,6 +362,10 @@ def to_text(o): return UnicodeType(o) +# Documented in api.rst to work around Sphinx limitation. +now = getattr(time, 'monotonic', time.time) + + # Python 2.4 try: any @@ -636,7 +640,7 @@ def _real_profile_hook(name, func, *args): return func(*args) finally: path = _profile_fmt % { - 'now': int(1e6 * time.time()), + 'now': int(1e6 * now()), 'identity': name, 'pid': os.getpid(), 'ext': '%s' @@ -3482,9 +3486,9 @@ class Broker(object): for _, (side, _) in self.poller.readers + self.poller.writers: self._call(side.stream, side.stream.on_shutdown) - deadline = time.time() + self.shutdown_timeout - while self.keep_alive() and time.time() < deadline: - self._loop_once(max(0, deadline - time.time())) + deadline = now() + self.shutdown_timeout + while self.keep_alive() and now() < deadline: + self._loop_once(max(0, deadline - now())) if self.keep_alive(): LOG.error('%r: pending work still existed %d seconds after ' diff --git a/mitogen/master.py b/mitogen/master.py index 09da775e..b4eb6643 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -910,9 +910,9 @@ class ModuleResponder(object): if self.minify_safe_re.search(source): # If the module contains a magic marker, it's safe to minify. - t0 = time.time() + t0 = mitogen.core.now() source = mitogen.minify.minimize_source(source).encode('utf-8') - self.minify_secs += time.time() - t0 + self.minify_secs += mitogen.core.now() - t0 if is_pkg: pkg_present = get_child_modules(path) @@ -1001,11 +1001,11 @@ class ModuleResponder(object): LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - t0 = time.time() + t0 = mitogen.core.now() try: self._send_module_and_related(stream, fullname) finally: - self.get_module_secs += time.time() - t0 + self.get_module_secs += mitogen.core.now() - t0 def _send_forward_module(self, stream, context, fullname): if stream.protocol.remote_id != context.context_id: diff --git a/mitogen/parent.py b/mitogen/parent.py index ec218913..7a94c2b0 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -633,7 +633,7 @@ class TimerList(object): :meth:`expire`. The main user interface to :class:`TimerList` is :meth:`schedule`. """ - _now = time.time + _now = mitogen.core.now def __init__(self): self._lst = [] @@ -1124,7 +1124,7 @@ class LineLoggingProtocolMixin(object): def on_line_received(self, line): self.logged_partial = None - self.logged_lines.append((time.time(), line)) + self.logged_lines.append((mitogen.core.now(), line)) self.logged_lines[:] = self.logged_lines[-100:] return super(LineLoggingProtocolMixin, self).on_line_received(line) @@ -1134,7 +1134,7 @@ class LineLoggingProtocolMixin(object): def on_disconnect(self, broker): if self.logged_partial: - self.logged_lines.append((time.time(), self.logged_partial)) + self.logged_lines.append((mitogen.core.now(), self.logged_partial)) self.logged_partial = None super(LineLoggingProtocolMixin, self).on_disconnect(broker) @@ -1324,7 +1324,7 @@ class Options(object): self.profiling = profiling self.unidirectional = unidirectional self.max_message_size = max_message_size - self.connect_deadline = time.time() + self.connect_timeout + self.connect_deadline = mitogen.core.now() + self.connect_timeout class Connection(object): @@ -1819,7 +1819,7 @@ class CallChain(object): socket.gethostname(), os.getpid(), thread.get_ident(), - int(1e6 * time.time()), + int(1e6 * mitogen.core.now()), ) def __repr__(self): @@ -2569,7 +2569,7 @@ class Reaper(object): def _install_timer(self, delay): new = self._timer is None self._timer = self.broker.timers.schedule( - when=time.time() + delay, + when=mitogen.core.now() + delay, func=self.reap, ) if new: diff --git a/mitogen/service.py b/mitogen/service.py index 6654fb32..8882b30b 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -1109,7 +1109,7 @@ class FileService(Service): :meth:`fetch`. """ LOG.debug('get_file(): fetching %r from %r', path, context) - t0 = time.time() + t0 = mitogen.core.now() recv = mitogen.core.Receiver(router=context.router) metadata = context.call_service( service_name=cls.name(), @@ -1143,5 +1143,6 @@ class FileService(Service): path, metadata['size'], received_bytes) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', - metadata['size'], path, context, 1000 * (time.time() - t0)) + metadata['size'], path, context, + 1000 * (mitogen.core.now() - t0)) return ok, metadata diff --git a/tests/bench/fork.py b/tests/bench/fork.py index b2f2382c..af5cb3a7 100644 --- a/tests/bench/fork.py +++ b/tests/bench/fork.py @@ -3,13 +3,13 @@ Measure latency of .fork() setup/teardown. """ import mitogen -import time +import mitogen.core @mitogen.main() def main(router): - t0 = time.time() + t0 = mitogen.core.now() for x in xrange(200): - t = time.time() + t = mitogen.core.now() ctx = router.fork() ctx.shutdown(wait=True) - print '++', 1000 * ((time.time() - t0) / (1.0+x)) + print '++', 1000 * ((mitogen.core.now() - t0) / (1.0+x)) diff --git a/tests/bench/large_messages.py b/tests/bench/large_messages.py index 24220023..e977e36d 100644 --- a/tests/bench/large_messages.py +++ b/tests/bench/large_messages.py @@ -4,7 +4,9 @@ import subprocess import time import socket + import mitogen +import mitogen.core @mitogen.main() @@ -15,12 +17,12 @@ def main(router): s = ' ' * n print('bytes in %.2fMiB string...' % (n/1048576.0),) - t0 = time.time() + t0 = mitogen.core.now() for x in range(10): - tt0 = time.time() + tt0 = mitogen.core.now() assert n == c.call(len, s) - print('took %dms' % (1000 * (time.time() - tt0),)) - t1 = time.time() + print('took %dms' % (1000 * (mitogen.core.now() - tt0),)) + t1 = mitogen.core.now() print('total %dms / %dms avg / %.2fMiB/sec' % ( 1000 * (t1 - t0), (1000 * (t1 - t0)) / (x + 1), diff --git a/tests/bench/latch_roundtrip.py b/tests/bench/latch_roundtrip.py index 49314fb9..1198aa48 100644 --- a/tests/bench/latch_roundtrip.py +++ b/tests/bench/latch_roundtrip.py @@ -6,6 +6,7 @@ import threading import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -33,8 +34,8 @@ t2.start() ready.get() ready.get() -t0 = time.time() +t0 = mitogen.core.now() l1.put(None) t1.join() t2.join() -print('++', int(1e6 * ((time.time() - t0) / (1.0+X))), 'usec') +print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+X))), 'usec') diff --git a/tests/bench/local.py b/tests/bench/local.py index 2808d803..aefeb84d 100644 --- a/tests/bench/local.py +++ b/tests/bench/local.py @@ -5,6 +5,7 @@ Measure latency of .local() setup. import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -15,10 +16,10 @@ mitogen.utils.setup_gil() @mitogen.main() def main(router): - t0=time.time() + t0 = mitogen.core.now() for x in range(100): - t = time.time() + t = mitogen.core.now() f = router.local()# debug=True) - tt = time.time() + tt = mitogen.core.now() print(x, 1000 * (tt - t)) - print('%.03f ms' % (1000 * (time.time() - t0) / (1.0 + x))) + print('%.03f ms' % (1000 * (mitogen.core.now() - t0) / (1.0 + x))) diff --git a/tests/bench/megatime.py b/tests/bench/megatime.py index 6f5f3b5d..40cd9986 100755 --- a/tests/bench/megatime.py +++ b/tests/bench/megatime.py @@ -4,12 +4,14 @@ import sys import os import time +import mitogen.core + times = [] for x in range(5): - t0 = time.time() + t0 = mitogen.core.now() os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:]) - t = time.time() - t0 + t = mitogen.core.now() - t0 times.append(t) print('+++', t) diff --git a/tests/bench/roundtrip.py b/tests/bench/roundtrip.py index 8d86d75b..8f31b1a2 100644 --- a/tests/bench/roundtrip.py +++ b/tests/bench/roundtrip.py @@ -5,6 +5,7 @@ Measure latency of local RPC. import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -23,7 +24,7 @@ def do_nothing(): def main(router): f = router.fork() f.call(do_nothing) - t0 = time.time() + t0 = mitogen.core.now() for x in xrange(20000): f.call(do_nothing) - print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') + print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec') diff --git a/tests/bench/service.py b/tests/bench/service.py index 6d866b5c..267ae3f6 100644 --- a/tests/bench/service.py +++ b/tests/bench/service.py @@ -4,8 +4,9 @@ Measure latency of local service RPC. import time -import mitogen.service import mitogen +import mitogen.core +import mitogen.service class MyService(mitogen.service.Service): @@ -17,7 +18,7 @@ class MyService(mitogen.service.Service): @mitogen.main() def main(router): f = router.fork() - t0 = time.time() + t0 = mitogen.core.now() for x in range(1000): f.call_service(service_name=MyService, method_name='ping') - print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') + print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec') diff --git a/tests/bench/ssh-roundtrip.py b/tests/bench/ssh-roundtrip.py index 8745505d..06c596c0 100644 --- a/tests/bench/ssh-roundtrip.py +++ b/tests/bench/ssh-roundtrip.py @@ -6,6 +6,7 @@ import sys import time import mitogen +import mitogen.core import mitogen.utils import ansible_mitogen.affinity @@ -24,12 +25,12 @@ def do_nothing(): def main(router): f = router.ssh(hostname=sys.argv[1]) f.call(do_nothing) - t0 = time.time() - end = time.time() + 5.0 + t0 = mitogen.core.now() + end = mitogen.core.now() + 5.0 i = 0 - while time.time() < end: + while mitogen.core.now() < end: f.call(do_nothing) i += 1 - t1 = time.time() + t1 = mitogen.core.now() print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms') diff --git a/tests/bench/throughput.py b/tests/bench/throughput.py index 42604826..acb51afa 100644 --- a/tests/bench/throughput.py +++ b/tests/bench/throughput.py @@ -8,6 +8,7 @@ import tempfile import time import mitogen +import mitogen.core import mitogen.service import ansible_mitogen.affinity @@ -35,9 +36,9 @@ def run_test(router, fp, s, context): size = fp.tell() print('Testing %s...' % (s,)) context.call(prepare) - t0 = time.time() + t0 = mitogen.core.now() context.call(transfer, router.myself(), fp.name) - t1 = time.time() + t1 = mitogen.core.now() print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % ( s, 1000 * (t1 - t0), size / 1048576.0, (size / (t1 - t0) / 1048576.0), diff --git a/tests/create_child_test.py b/tests/create_child_test.py index 1c2f526a..26f10d57 100644 --- a/tests/create_child_test.py +++ b/tests/create_child_test.py @@ -9,6 +9,7 @@ import tempfile import mock import unittest2 +import mitogen.core import mitogen.parent from mitogen.core import b @@ -188,7 +189,6 @@ class TtyCreateChildTest(testlib.TestCase): proc = self.func([ 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) ]) - deadline = time.time() + 5.0 mitogen.core.set_block(proc.stdin.fileno()) # read(3) below due to https://bugs.python.org/issue37696 self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3)) @@ -271,7 +271,6 @@ class TtyCreateChildTest(testlib.TestCase): proc = self.func([ 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) ]) - deadline = time.time() + 5.0 self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3)) waited_pid, status = os.waitpid(proc.pid, 0) self.assertEquals(proc.pid, waited_pid) diff --git a/tests/parent_test.py b/tests/parent_test.py index b314d472..d6efe998 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -12,6 +12,7 @@ import unittest2 import testlib from testlib import Popen__terminate +import mitogen.core import mitogen.parent try: @@ -21,8 +22,8 @@ except NameError: def wait_for_child(pid, timeout=1.0): - deadline = time.time() + timeout - while timeout < time.time(): + deadline = mitogen.core.now() + timeout + while timeout < mitogen.core.now(): try: target_pid, status = os.waitpid(pid, os.WNOHANG) if target_pid == pid: diff --git a/tests/poller_test.py b/tests/poller_test.py index b05a9b94..3ed59ae3 100644 --- a/tests/poller_test.py +++ b/tests/poller_test.py @@ -164,14 +164,14 @@ class CloseMixin(PollerMixin): class PollMixin(PollerMixin): def test_empty_zero_timeout(self): - t0 = time.time() + t0 = mitogen.core.now() self.assertEquals([], list(self.p.poll(0))) - self.assertTrue((time.time() - t0) < .1) # vaguely reasonable + self.assertTrue((mitogen.core.now() - t0) < .1) # vaguely reasonable def test_empty_small_timeout(self): - t0 = time.time() + t0 = mitogen.core.now() self.assertEquals([], list(self.p.poll(.2))) - self.assertTrue((time.time() - t0) >= .2) + self.assertTrue((mitogen.core.now() - t0) >= .2) class ReadableMixin(PollerMixin, SockMixin): diff --git a/tests/testlib.py b/tests/testlib.py index 255fba88..73d3438d 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -107,11 +107,11 @@ def wait_for_port( If a regex pattern is supplied try to find it in the initial data. Return None on success, or raise on error. """ - start = time.time() + start = mitogen.core.now() end = start + overall_timeout addr = (host, port) - while time.time() < end: + while mitogen.core.now() < end: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(connect_timeout) try: @@ -130,7 +130,7 @@ def wait_for_port( sock.settimeout(receive_timeout) data = mitogen.core.b('') found = False - while time.time() < end: + while mitogen.core.now() < end: try: resp = sock.recv(1024) except socket.timeout: diff --git a/tests/timer_test.py b/tests/timer_test.py index ff3e022f..749405a4 100644 --- a/tests/timer_test.py +++ b/tests/timer_test.py @@ -162,7 +162,7 @@ def do_timer_test_econtext(econtext): def do_timer_test(broker): - now = time.time() + now = mitogen.core.now() latch = mitogen.core.Latch() broker.defer(lambda: broker.timers.schedule( @@ -172,7 +172,7 @@ def do_timer_test(broker): ) assert 'hi' == latch.get() - assert time.time() > (now + 0.250) + assert mitogen.core.now() > (now + 0.250) class BrokerTimerTest(testlib.TestCase): diff --git a/tests/unix_test.py b/tests/unix_test.py index ba1ba152..cf3e595f 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -86,12 +86,12 @@ class ClientTest(testlib.TestCase): def _try_connect(self, path): # give server a chance to setup listener - timeout = time.time() + 30.0 + timeout = mitogen.core.now() + 30.0 while True: try: return mitogen.unix.connect(path) except mitogen.unix.ConnectError: - if time.time() > timeout: + if mitogen.core.now() > timeout: raise time.sleep(0.1) From 4fa760cd21f5bd0d572a3e90f737ad1f20f0f906 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 20:30:24 +0100 Subject: [PATCH 7/9] issue #613: add tests for all the weird shutdown methods --- docs/internals.rst | 5 +++ mitogen/parent.py | 3 +- tests/router_test.py | 105 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 1 deletion(-) diff --git a/docs/internals.rst b/docs/internals.rst index d062b6d9..40ea33df 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -321,6 +321,11 @@ These signals are used internally by Mitogen. - ``disconnect`` - Fired on the Broker thread during shutdown (???) + * - :py:class:`mitogen.parent.Process` + - ``exit`` + - Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully + exitted. + * - :py:class:`mitogen.core.Broker` - ``shutdown`` - Fired after Broker.shutdown() is called. diff --git a/mitogen/parent.py b/mitogen/parent.py index 7a94c2b0..983df829 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -2331,7 +2331,7 @@ class Router(mitogen.core.Router): directly connected. """ stream = self.stream_by_id(context) - if stream.protocol.remote_id != context.context_id: + if stream is None or stream.protocol.remote_id != context.context_id: return l = mitogen.core.Latch() @@ -2589,6 +2589,7 @@ class Reaper(object): status = self.proc.poll() if status is not None: LOG.debug('%r: %s', self.proc, returncode_to_str(status)) + mitogen.core.fire(self.proc, 'exit') self._remove_timer() return diff --git a/tests/router_test.py b/tests/router_test.py index dba56c9b..a9ad5ae1 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -1,3 +1,5 @@ +import errno +import os import sys import time import zlib @@ -425,5 +427,108 @@ class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): ])) +class ShutdownTest(testlib.RouterMixin, testlib.TestCase): + # 613: tests for all the weird shutdown() variants we ended up with. + + def test_shutdown_wait_false(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + conn = self.router.stream_by_id(l1.context_id).conn + exitted = mitogen.core.Latch() + mitogen.core.listen(conn.proc, 'exit', exitted.put) + + l1.shutdown(wait=False) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_shutdown_wait_true(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + conn = self.router.stream_by_id(l1.context_id).conn + exitted = mitogen.core.Latch() + mitogen.core.listen(conn.proc, 'exit', exitted.put) + + l1.shutdown(wait=True) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_disconnect_invalid_context(self): + self.router.disconnect( + mitogen.core.Context(self.router, 1234) + ) + + def test_disconnect_valid_context(self): + l1 = self.router.local() + pid = l1.call(os.getpid) + + strm = self.router.stream_by_id(l1.context_id) + + exitted = mitogen.core.Latch() + mitogen.core.listen(strm.conn.proc, 'exit', exitted.put) + self.router.disconnect_stream(strm) + exitted.get() + + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: l1.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + l1.context_id, + mitogen.context_id, + )) + + def test_disconnet_all(self): + l1 = self.router.local() + l2 = self.router.local() + + pids = [l1.call(os.getpid), l2.call(os.getpid)] + + exitted = mitogen.core.Latch() + for ctx in l1, l2: + strm = self.router.stream_by_id(ctx.context_id) + mitogen.core.listen(strm.conn.proc, 'exit', exitted.put) + + self.router.disconnect_all() + exitted.get() + exitted.get() + + for pid in pids: + e = self.assertRaises(OSError, + lambda: os.waitpid(pid, 0)) + self.assertEquals(e.args[0], errno.ECHILD) + + for ctx in l1, l2: + e = self.assertRaises(mitogen.core.ChannelError, + lambda: ctx.call(os.getpid)) + self.assertEquals(e.args[0], mitogen.core.Router.no_route_msg % ( + ctx.context_id, + mitogen.context_id, + )) + + if __name__ == '__main__': unittest2.main() From f78a5f08c6973496deded69de2edc0e12f9c97ba Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 23:40:36 +0000 Subject: [PATCH 8/9] issue #605: ansible: share a sem_t instead of a pthread_mutex_t The previous version quite reliably causes worker deadlocks within 10 minutes running: # 100 times: - import_playbook: integration/async/runner_one_job.yml # 100 times: - import_playbook: integration/module_utils/adjacent_to_playbook.yml via .ci/soak/mitogen.sh with PLAYBOOK= set to the above playbook. Attaching to the worker with gdb reveals it in an instruction immediately following a futex() call, which likely returned EINTR due to attaching gdb. Examining the pthread_mutex_t state reveals it to be completely unlocked. pthread_mutex_t on Linux should have zero trouble living in shmem, so it's not clear how this deadlock is happening. Meanwhile POSIX semaphores are explicitly designed for cross-process use and have a completely different internal implementation, so try those instead. 1 hour of soaking reveals no deadlock. This is about avoiding managing a lockable temporary file on disk to contain our counter, and somehow communicating a reference to it into subprocesses (despite the subprocess module closing inherited fds, etc), somehow deleting it reliably at exit, and somehow avoiding concurrent Ansible runs stepping on the same file. For now ctypes is still less pain. A final possibility would be to abandon a shared counter and instead pick a CPU based on the hash of e.g. the new child's process ID. That would likely balance equally well, and might be worth exploring when making this code work on BSD. --- ansible_mitogen/affinity.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/ansible_mitogen/affinity.py b/ansible_mitogen/affinity.py index 9eb6597a..67e16d8a 100644 --- a/ansible_mitogen/affinity.py +++ b/ansible_mitogen/affinity.py @@ -92,37 +92,37 @@ try: _libc = ctypes.CDLL(None, use_errno=True) _strerror = _libc.strerror _strerror.restype = ctypes.c_char_p - _pthread_mutex_init = _libc.pthread_mutex_init - _pthread_mutex_lock = _libc.pthread_mutex_lock - _pthread_mutex_unlock = _libc.pthread_mutex_unlock + _sem_init = _libc.sem_init + _sem_wait = _libc.sem_wait + _sem_post = _libc.sem_post _sched_setaffinity = _libc.sched_setaffinity except (OSError, AttributeError): _libc = None _strerror = None - _pthread_mutex_init = None - _pthread_mutex_lock = None - _pthread_mutex_unlock = None + _sem_init = None + _sem_wait = None + _sem_post = None _sched_setaffinity = None -class pthread_mutex_t(ctypes.Structure): +class sem_t(ctypes.Structure): """ - Wrap pthread_mutex_t to allow storing a lock in shared memory. + Wrap sem_t to allow storing a lock in shared memory. """ _fields_ = [ - ('data', ctypes.c_uint8 * 512), + ('data', ctypes.c_uint8 * 128), ] def init(self): - if _pthread_mutex_init(self.data, 0): + if _sem_init(self.data, 1, 1): raise Exception(_strerror(ctypes.get_errno())) def acquire(self): - if _pthread_mutex_lock(self.data): + if _sem_wait(self.data): raise Exception(_strerror(ctypes.get_errno())) def release(self): - if _pthread_mutex_unlock(self.data): + if _sem_post(self.data): raise Exception(_strerror(ctypes.get_errno())) @@ -133,7 +133,7 @@ class State(ctypes.Structure): the context of the new child process. """ _fields_ = [ - ('lock', pthread_mutex_t), + ('lock', sem_t), ('counter', ctypes.c_uint8), ] From 240dc84d9455e18932e164e77345f275ba06d2dd Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 23:57:35 +0000 Subject: [PATCH 9/9] issue #605: update Changelog. --- docs/changelog.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index eb889daa..63266113 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -114,6 +114,9 @@ Mitogen for Ansible is more accurately inferred for `meta: reset_connection`, the `synchronize` module, and for any action plug-ins that establish additional connections. +* `#605 `_: fix a deadlock managing a + shared counter used for load balancing. + * `#615 `_: streaming file transfer is implemented for ``fetch`` and other actions that transfer files from the target to the controller. Previously the file was sent in one message,