os_fork: clean up docs

pull/564/head
David Wilson 7 years ago
parent 021cfda112
commit 5dc0bd6f8d

@ -158,10 +158,10 @@ Core Library
* `#535 <https://github.com/dw/mitogen/issues/535>`_: a new * `#535 <https://github.com/dw/mitogen/issues/535>`_: a new
:mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses :mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses
all thread activity during a fork. :class:`mitogen.core.Broker` and thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and
:class:`mitogen.service.Pool` automatically record their existence so that an :class:`mitogen.service.Pool` automatically record their existence so that a
:func:`os.fork` monkey-patch activated for Python 2.4 and 2.5 can :func:`os.fork` monkey-patch can automatically pause them for any attempt to
automatically pause them for any attempt to start a subprocess. start a subprocess.
* `ca63c26e <https://github.com/dw/mitogen/commit/ca63c26e>`_: * `ca63c26e <https://github.com/dw/mitogen/commit/ca63c26e>`_:
:meth:`mitogen.core.Latch.put`'s `obj` argument was made optional. :meth:`mitogen.core.Latch.put`'s `obj` argument was made optional.

@ -29,13 +29,7 @@
# !mitogen: minify_safe # !mitogen: minify_safe
""" """
When operating in a mixed threading/forking environment, it is critical no Support for operating in a mixed threading/forking environment.
threads are active at the moment of fork, as they could be within critical
sections whose mutexes are snapshotted in the locked state in the fork child.
To permit unbridled Mitogen use in a forking program, a mechanism must exist to
temporarily halt any threads in operation -- namely the broker and any pool
threads.
""" """
import os import os
@ -53,6 +47,10 @@ _pools = weakref.WeakKeyDictionary()
def _notice_broker_or_pool(obj): def _notice_broker_or_pool(obj):
"""
Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically
register every broker and pool on Python 2.4/2.5.
"""
if isinstance(obj, mitogen.core.Broker): if isinstance(obj, mitogen.core.Broker):
_brokers[obj] = True _brokers[obj] = True
else: else:
@ -85,8 +83,31 @@ class Corker(object):
:class:`mitogen.service.Pool` to be temporarily "corked" while fork :class:`mitogen.service.Pool` to be temporarily "corked" while fork
operations may occur. operations may occur.
In a mixed threading/forking environment, it is critical no threads are
active at the moment of fork, as they could hold mutexes whose state is
unrecoverably snapshotted in the locked state in the fork child, causing
random deadlocks at random future moments.
To ensure a target thread has all locks dropped, we ask it to write a large
string to a socket with a small buffer that has :data:`os.O_NONBLOCK`
disabled. CPython will drop the GIL and enter the ``write()`` system call,
where it will block until the socket buffer is drained, or the write side
is closed.
We can ensure the thread really has blocked outside of any Python locks by
checking if the socket buffer has started to fill using a
:class:`mitogen.core.Poller`.
Since this necessarily involves posting a message to every existent thread Since this necessarily involves posting a message to every existent thread
and verifying acknowledgement, it will never be a fast operation. and verifying acknowledgement, it will never be a fast operation.
This does not yet handle the case of corking being initiated from within a
thread that is also a cork target.
:param brokers:
Sequence of :class:`mitogen.core.Broker` instances to cork.
:param pools:
Sequence of :class:`mitogen.core.Pool` instances to cork.
""" """
def __init__(self, brokers=(), pools=()): def __init__(self, brokers=(), pools=()):
self.brokers = brokers self.brokers = brokers
@ -106,13 +127,8 @@ class Corker(object):
def _cork_one(self, s, obj): def _cork_one(self, s, obj):
""" """
To ensure the target thread has all locks dropped, we ask it to write a Construct a socketpair, saving one side of it, and passing the other to
large string to a socket with a small buffer that has O_NONBLOCK `obj` to be written to by one of its threads.
disabled. CPython will drop the GIL and enter the write() system call,
where it will block until the socket buffer is drained, or the write
side is closed. We can detect the thread has blocked outside of Python
code by checking if the socket buffer has started to fill using a
poller.
""" """
rsock, wsock = mitogen.parent.create_socketpair(size=4096) rsock, wsock = mitogen.parent.create_socketpair(size=4096)
mitogen.core.set_cloexec(rsock.fileno()) mitogen.core.set_cloexec(rsock.fileno())
@ -120,6 +136,12 @@ class Corker(object):
mitogen.core.set_block(wsock) # gevent mitogen.core.set_block(wsock) # gevent
self._rsocks.append(rsock) self._rsocks.append(rsock)
obj.defer(self._do_cork, s, wsock) obj.defer(self._do_cork, s, wsock)
def _verify_one(self, rsock):
"""
Pause until the socket `rsock` indicates readability, due to
:meth:`_do_cork` triggering a blocking write on another thread.
"""
poller = mitogen.core.Poller() poller = mitogen.core.Poller()
poller.start_receive(rsock.fileno()) poller.start_receive(rsock.fileno())
try: try:
@ -137,14 +159,22 @@ class Corker(object):
""" """
s = mitogen.core.b('CORK') * ((128 // 4) * 1024) s = mitogen.core.b('CORK') * ((128 // 4) * 1024)
self._rsocks = [] self._rsocks = []
# Pools must be paused first, as existing work may require the
# participation of a broker in order to complete.
for pool in self.pools: for pool in self.pools:
if not pool.closed: if not pool.closed:
for x in range(pool.size): for x in range(pool.size):
self._cork_one(s, pool) self._cork_one(s, pool)
for broker in self.brokers: for broker in self.brokers:
if broker._alive: if broker._alive:
self._cork_one(s, broker) self._cork_one(s, broker)
# Pause until we can detect every thread has entered write().
for rsock in self._rsocks:
self._verify_one(rsock)
def uncork(self): def uncork(self):
""" """
Arrange for paused threads to resume operation. Arrange for paused threads to resume operation.

Loading…
Cancel
Save