@ -87,6 +87,17 @@ import warnings
import weakref
import weakref
import zlib
import zlib
if sys . version_info > ( 3 , 5 ) :
from os import get_blocking , set_blocking
else :
def get_blocking ( fd ) :
return not fcntl . fcntl ( fd , fcntl . F_GETFL ) & os . O_NONBLOCK
def set_blocking ( fd , blocking ) :
fl = fcntl . fcntl ( fd , fcntl . F_GETFL )
if blocking : fcntl . fcntl ( fd , fcntl . F_SETFL , fl & ~ os . O_NONBLOCK )
else : fcntl . fcntl ( fd , fcntl . F_SETFL , fl | os . O_NONBLOCK )
try :
try :
# Python >= 3.4, PEP 451 ModuleSpec API
# Python >= 3.4, PEP 451 ModuleSpec API
import importlib . machinery
import importlib . machinery
@ -559,26 +570,6 @@ def set_cloexec(fd):
fcntl . fcntl ( fd , fcntl . F_SETFD , flags | fcntl . FD_CLOEXEC )
fcntl . fcntl ( fd , fcntl . F_SETFD , flags | fcntl . FD_CLOEXEC )
def set_nonblock ( fd ) :
"""
Set the file descriptor ` fd ` to non - blocking mode . For most underlying file
types , this causes : func : ` os . read ` or : func : ` os . write ` to raise
: class : ` OSError ` with : data : ` errno . EAGAIN ` rather than block the thread
when the underlying kernel buffer is exhausted .
"""
flags = fcntl . fcntl ( fd , fcntl . F_GETFL )
fcntl . fcntl ( fd , fcntl . F_SETFL , flags | os . O_NONBLOCK )
def set_block ( fd ) :
"""
Inverse of : func : ` set_nonblock ` , i . e . cause ` fd ` to block the thread when
the underlying kernel buffer is exhausted .
"""
flags = fcntl . fcntl ( fd , fcntl . F_GETFL )
fcntl . fcntl ( fd , fcntl . F_SETFL , flags & ~ os . O_NONBLOCK )
def io_op ( func , * args ) :
def io_op ( func , * args ) :
"""
"""
Wrap ` func ( * args ) ` that may raise : class : ` select . error ` , : class : ` IOError ` ,
Wrap ` func ( * args ) ` that may raise : class : ` select . error ` , : class : ` IOError ` ,
@ -720,7 +711,7 @@ def import_module(modname):
return __import__ ( modname , None , None , [ ' ' ] )
return __import__ ( modname , None , None , [ ' ' ] )
def pipe ( ) :
def pipe ( blocking = None ) :
"""
"""
Create a UNIX pipe pair using : func : ` os . pipe ` , wrapping the returned
Create a UNIX pipe pair using : func : ` os . pipe ` , wrapping the returned
descriptors in Python file objects in order to manage their lifetime and
descriptors in Python file objects in order to manage their lifetime and
@ -728,12 +719,22 @@ def pipe():
not been closed explicitly .
not been closed explicitly .
"""
"""
rfd , wfd = os . pipe ( )
rfd , wfd = os . pipe ( )
for fd in rfd , wfd :
if blocking is not None : set_blocking ( fd , blocking ) # noqa: E701
return (
return (
os . fdopen ( rfd , ' rb ' , 0 ) ,
os . fdopen ( rfd , ' rb ' , 0 ) ,
os . fdopen ( wfd , ' wb ' , 0 )
os . fdopen ( wfd , ' wb ' , 0 )
)
)
def socketpair ( blocking = None ) :
fp1 , fp2 = socket . socketpair ( )
for fp in fp1 , fp2 :
fd = fp . fileno ( )
if blocking is not None : set_blocking ( fd , blocking ) # noqa: E701
return fp1 , fp2
def iter_split ( buf , delim , func ) :
def iter_split ( buf , delim , func ) :
"""
"""
Invoke ` func ( s ) ` for each ` delim ` - delimited chunk in the potentially large
Invoke ` func ( s ) ` for each ` delim ` - delimited chunk in the potentially large
@ -1879,8 +1880,7 @@ class Stream(object):
"""
"""
Attach a pair of file objects to : attr : ` receive_side ` and
Attach a pair of file objects to : attr : ` receive_side ` and
: attr : ` transmit_side ` , after wrapping them in : class : ` Side ` instances .
: attr : ` transmit_side ` , after wrapping them in : class : ` Side ` instances .
: class : ` Side ` will call : func : ` set_nonblock ` and : func : ` set_cloexec `
: class : ` Side ` will call : func : ` set_cloexec ` on them .
on the underlying file descriptors during construction .
The same file object may be used for both sides . The default
The same file object may be used for both sides . The default
: meth : ` on_disconnect ` is handles the possibility that only one
: meth : ` on_disconnect ` is handles the possibility that only one
@ -2155,14 +2155,11 @@ class Side(object):
: param bool keep_alive :
: param bool keep_alive :
If : data : ` True ` , the continued existence of this side will extend the
If : data : ` True ` , the continued existence of this side will extend the
shutdown grace period until it has been unregistered from the broker .
shutdown grace period until it has been unregistered from the broker .
: param bool blocking :
If : data : ` False ` , the descriptor has its : data : ` os . O_NONBLOCK ` flag
enabled using : func : ` fcntl . fcntl ` .
"""
"""
_fork_refs = weakref . WeakValueDictionary ( )
_fork_refs = weakref . WeakValueDictionary ( )
closed = False
closed = False
def __init__ ( self , stream , fp , cloexec = True , keep_alive = True , blocking = False ):
def __init__ ( self , stream , fp , cloexec = True , keep_alive = True ):
#: The :class:`Stream` for which this is a read or write side.
#: The :class:`Stream` for which this is a read or write side.
self . stream = stream
self . stream = stream
# File or socket object responsible for the lifetime of its underlying
# File or socket object responsible for the lifetime of its underlying
@ -2180,8 +2177,6 @@ class Side(object):
self . _fork_refs [ id ( self ) ] = self
self . _fork_refs [ id ( self ) ] = self
if cloexec :
if cloexec :
set_cloexec ( self . fd )
set_cloexec ( self . fd )
if not blocking :
set_nonblock ( self . fd )
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' <Side of %s fd %s > ' % (
return ' <Side of %s fd %s > ' % (
@ -2785,7 +2780,7 @@ class Latch(object):
try :
try :
return self . _cls_idle_socketpairs . pop ( ) # pop() must be atomic
return self . _cls_idle_socketpairs . pop ( ) # pop() must be atomic
except IndexError :
except IndexError :
rsock , wsock = socket . socket pair( )
rsock , wsock = socket pair( )
rsock . setblocking ( False )
rsock . setblocking ( False )
set_cloexec ( rsock . fileno ( ) )
set_cloexec ( rsock . fileno ( ) )
set_cloexec ( wsock . fileno ( ) )
set_cloexec ( wsock . fileno ( ) )
@ -2958,7 +2953,8 @@ class Waker(Protocol):
@classmethod
@classmethod
def build_stream ( cls , broker ) :
def build_stream ( cls , broker ) :
stream = super ( Waker , cls ) . build_stream ( broker )
stream = super ( Waker , cls ) . build_stream ( broker )
stream . accept ( * pipe ( ) )
rfp , wfp = pipe ( blocking = False )
stream . accept ( rfp , wfp )
return stream
return stream
def __init__ ( self , broker ) :
def __init__ ( self , broker ) :
@ -3056,7 +3052,8 @@ class IoLoggerProtocol(DelimitedProtocol):
prevent break : meth : ` on_shutdown ` from calling : meth : ` shutdown ( )
prevent break : meth : ` on_shutdown ` from calling : meth : ` shutdown ( )
< socket . socket . shutdown > ` on it .
< socket . socket . shutdown > ` on it .
"""
"""
rsock , wsock = socket . socketpair ( )
# Leave wsock & dest_fd blocking, so the subprocess will have sane stdio
rsock , wsock = socketpair ( )
os . dup2 ( wsock . fileno ( ) , dest_fd )
os . dup2 ( wsock . fileno ( ) , dest_fd )
stream = super ( IoLoggerProtocol , cls ) . build_stream ( name )
stream = super ( IoLoggerProtocol , cls ) . build_stream ( name )
stream . name = name
stream . name = name
@ -4038,6 +4035,9 @@ class ExternalContext(object):
local_id = self . config [ ' context_id ' ] ,
local_id = self . config [ ' context_id ' ] ,
parent_ids = self . config [ ' parent_ids ' ]
parent_ids = self . config [ ' parent_ids ' ]
)
)
for f in in_fp , out_fp :
fd = f . fileno ( )
set_blocking ( fd , False )
self . stream . accept ( in_fp , out_fp )
self . stream . accept ( in_fp , out_fp )
self . stream . name = ' parent '
self . stream . name = ' parent '
self . stream . receive_side . keep_alive = False
self . stream . receive_side . keep_alive = False