@ -29,13 +29,7 @@
# !mitogen: minify_safe
# !mitogen: minify_safe
"""
"""
When operating in a mixed threading / forking environment , it is critical no
Support for operating in a mixed threading / forking environment .
threads are active at the moment of fork , as they could be within critical
sections whose mutexes are snapshotted in the locked state in the fork child .
To permit unbridled Mitogen use in a forking program , a mechanism must exist to
temporarily halt any threads in operation - - namely the broker and any pool
threads .
"""
"""
import os
import os
@ -53,6 +47,10 @@ _pools = weakref.WeakKeyDictionary()
def _notice_broker_or_pool ( obj ) :
def _notice_broker_or_pool ( obj ) :
"""
Used by : mod : ` mitogen . core ` and : mod : ` mitogen . service ` to automatically
register every broker and pool on Python 2.4 / 2.5 .
"""
if isinstance ( obj , mitogen . core . Broker ) :
if isinstance ( obj , mitogen . core . Broker ) :
_brokers [ obj ] = True
_brokers [ obj ] = True
else :
else :
@ -85,8 +83,31 @@ class Corker(object):
: class : ` mitogen . service . Pool ` to be temporarily " corked " while fork
: class : ` mitogen . service . Pool ` to be temporarily " corked " while fork
operations may occur .
operations may occur .
In a mixed threading / forking environment , it is critical no threads are
active at the moment of fork , as they could hold mutexes whose state is
unrecoverably snapshotted in the locked state in the fork child , causing
deadlocks at random future moments .
To ensure a target thread has all locks dropped , it is made to write a
large string to a socket with a small buffer that has : data : ` os . O_NONBLOCK `
disabled . CPython will drop the GIL and enter the ` ` write ( ) ` ` system call ,
where it will block until the socket buffer is drained , or the write side
is closed .
: class : ` mitogen . core . Poller ` is used to ensure the thread really has
blocked outside any Python locks , by checking if the socket buffer has
started to fill .
Since this necessarily involves posting a message to every existent thread
Since this necessarily involves posting a message to every existent thread
and verifying acknowledgement , it will never be a fast operation .
and verifying acknowledgement , it will never be a fast operation .
This does not yet handle the case of corking being initiated from within a
thread that is also a cork target .
: param brokers :
Sequence of : class : ` mitogen . core . Broker ` instances to cork .
: param pools :
Sequence of : class : ` mitogen . core . Pool ` instances to cork .
"""
"""
def __init__ ( self , brokers = ( ) , pools = ( ) ) :
def __init__ ( self , brokers = ( ) , pools = ( ) ) :
self . brokers = brokers
self . brokers = brokers
@ -106,13 +127,8 @@ class Corker(object):
def _cork_one ( self , s , obj ) :
def _cork_one ( self , s , obj ) :
"""
"""
To ensure the target thread has all locks dropped , we ask it to write a
Construct a socketpair , saving one side of it , and passing the other to
large string to a socket with a small buffer that has O_NONBLOCK
` obj ` to be written to by one of its threads .
disabled . CPython will drop the GIL and enter the write ( ) system call ,
where it will block until the socket buffer is drained , or the write
side is closed . We can detect the thread has blocked outside of Python
code by checking if the socket buffer has started to fill using a
poller .
"""
"""
rsock , wsock = mitogen . parent . create_socketpair ( size = 4096 )
rsock , wsock = mitogen . parent . create_socketpair ( size = 4096 )
mitogen . core . set_cloexec ( rsock . fileno ( ) )
mitogen . core . set_cloexec ( rsock . fileno ( ) )
@ -120,6 +136,12 @@ class Corker(object):
mitogen . core . set_block ( wsock ) # gevent
mitogen . core . set_block ( wsock ) # gevent
self . _rsocks . append ( rsock )
self . _rsocks . append ( rsock )
obj . defer ( self . _do_cork , s , wsock )
obj . defer ( self . _do_cork , s , wsock )
def _verify_one ( self , rsock ) :
"""
Pause until the socket ` rsock ` indicates readability , due to
: meth : ` _do_cork ` triggering a blocking write on another thread .
"""
poller = mitogen . core . Poller ( )
poller = mitogen . core . Poller ( )
poller . start_receive ( rsock . fileno ( ) )
poller . start_receive ( rsock . fileno ( ) )
try :
try :
@ -131,20 +153,28 @@ class Corker(object):
def cork ( self ) :
def cork ( self ) :
"""
"""
Arrange for th e broker and optional pool to be paused with no locks
Arrange for any associa ted broker s and pools to be paused with no locks
held . This will not return until each thread acknowledges it has ceased
held . This will not return until each thread acknowledges it has ceased
execution .
execution .
"""
"""
s = ' CORK ' * ( ( 128 / 4 ) * 1024 )
s = mitogen . core . b ( ' CORK ' ) * ( ( 128 / / 4 ) * 1024 )
self . _rsocks = [ ]
self . _rsocks = [ ]
# Pools must be paused first, as existing work may require the
# participation of a broker in order to complete.
for pool in self . pools :
for pool in self . pools :
if not pool . closed :
if not pool . closed :
for x in range ( pool . size ) :
for x in range ( pool . size ) :
self . _cork_one ( s , pool )
self . _cork_one ( s , pool )
for broker in self . brokers :
for broker in self . brokers :
if broker . _alive :
if broker . _alive :
self . _cork_one ( s , broker )
self . _cork_one ( s , broker )
# Pause until we can detect every thread has entered write().
for rsock in self . _rsocks :
self . _verify_one ( rsock )
def uncork ( self ) :
def uncork ( self ) :
"""
"""
Arrange for paused threads to resume operation .
Arrange for paused threads to resume operation .