From 5dc0bd6f8dfab0cf5d2a390410e8242b94f37238 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:21:25 +0000 Subject: [PATCH] os_fork: clean up docs --- docs/changelog.rst | 8 +++---- mitogen/os_fork.py | 58 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 02d2495c..abddb894 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -158,10 +158,10 @@ Core Library * `#535 `_: a new :mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses - all thread activity during a fork. :class:`mitogen.core.Broker` and - :class:`mitogen.service.Pool` automatically record their existence so that an - :func:`os.fork` monkey-patch activated for Python 2.4 and 2.5 can - automatically pause them for any attempt to start a subprocess. + thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and + :class:`mitogen.service.Pool` automatically record their existence so that a + :func:`os.fork` monkey-patch can automatically pause them for any attempt to + start a subprocess. * `ca63c26e `_: :meth:`mitogen.core.Latch.put`'s `obj` argument was made optional. diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 386a061f..3dbfbcdf 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -29,13 +29,7 @@ # !mitogen: minify_safe """ -When operating in a mixed threading/forking environment, it is critical no -threads are active at the moment of fork, as they could be within critical -sections whose mutexes are snapshotted in the locked state in the fork child. - -To permit unbridled Mitogen use in a forking program, a mechanism must exist to -temporarily halt any threads in operation -- namely the broker and any pool -threads. +Support for operating in a mixed threading/forking environment. """ import os @@ -53,6 +47,10 @@ _pools = weakref.WeakKeyDictionary() def _notice_broker_or_pool(obj): + """ + Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically + register every broker and pool on Python 2.4/2.5. + """ if isinstance(obj, mitogen.core.Broker): _brokers[obj] = True else: @@ -85,8 +83,31 @@ class Corker(object): :class:`mitogen.service.Pool` to be temporarily "corked" while fork operations may occur. + In a mixed threading/forking environment, it is critical no threads are + active at the moment of fork, as they could hold mutexes whose state is + unrecoverably snapshotted in the locked state in the fork child, causing + random deadlocks at random future moments. + + To ensure a target thread has all locks dropped, we ask it to write a large + string to a socket with a small buffer that has :data:`os.O_NONBLOCK` + disabled. CPython will drop the GIL and enter the ``write()`` system call, + where it will block until the socket buffer is drained, or the write side + is closed. + + We can ensure the thread really has blocked outside of any Python locks by + checking if the socket buffer has started to fill using a + :class:`mitogen.core.Poller`. + Since this necessarily involves posting a message to every existent thread and verifying acknowledgement, it will never be a fast operation. + + This does not yet handle the case of corking being initiated from within a + thread that is also a cork target. + + :param brokers: + Sequence of :class:`mitogen.core.Broker` instances to cork. + :param pools: + Sequence of :class:`mitogen.core.Pool` instances to cork. """ def __init__(self, brokers=(), pools=()): self.brokers = brokers @@ -106,13 +127,8 @@ class Corker(object): def _cork_one(self, s, obj): """ - To ensure the target thread has all locks dropped, we ask it to write a - large string to a socket with a small buffer that has O_NONBLOCK - disabled. CPython will drop the GIL and enter the write() system call, - where it will block until the socket buffer is drained, or the write - side is closed. We can detect the thread has blocked outside of Python - code by checking if the socket buffer has started to fill using a - poller. + Construct a socketpair, saving one side of it, and passing the other to + `obj` to be written to by one of its threads. """ rsock, wsock = mitogen.parent.create_socketpair(size=4096) mitogen.core.set_cloexec(rsock.fileno()) @@ -120,6 +136,12 @@ class Corker(object): mitogen.core.set_block(wsock) # gevent self._rsocks.append(rsock) obj.defer(self._do_cork, s, wsock) + + def _verify_one(self, rsock): + """ + Pause until the socket `rsock` indicates readability, due to + :meth:`_do_cork` triggering a blocking write on another thread. + """ poller = mitogen.core.Poller() poller.start_receive(rsock.fileno()) try: @@ -137,14 +159,22 @@ class Corker(object): """ s = mitogen.core.b('CORK') * ((128 // 4) * 1024) self._rsocks = [] + + # Pools must be paused first, as existing work may require the + # participation of a broker in order to complete. for pool in self.pools: if not pool.closed: for x in range(pool.size): self._cork_one(s, pool) + for broker in self.brokers: if broker._alive: self._cork_one(s, broker) + # Pause until we can detect every thread has entered write(). + for rsock in self._rsocks: + self._verify_one(rsock) + def uncork(self): """ Arrange for paused threads to resume operation.