@ -1374,10 +1374,12 @@ class Importer(object):
if not present :
if not present :
funcs = self . _callbacks . get ( fullname )
funcs = self . _callbacks . get ( fullname )
if funcs is not None :
if funcs is not None :
_v and LOG . debug ( ' _request_module( %r ): in flight ' , fullname )
_v and LOG . debug ( ' %s : existing request for %s in flight ' ,
self , fullname )
funcs . append ( callback )
funcs . append ( callback )
else :
else :
_v and LOG . debug ( ' _request_module( %r ): new request ' , fullname )
_v and LOG . debug ( ' %s : requesting %s from parent ' ,
self , fullname )
self . _callbacks [ fullname ] = [ callback ]
self . _callbacks [ fullname ] = [ callback ]
self . _context . send (
self . _context . send (
Message ( data = b ( fullname ) , handle = GET_MODULE )
Message ( data = b ( fullname ) , handle = GET_MODULE )
@ -1493,6 +1495,80 @@ class LogHandler(logging.Handler):
self . local . in_emit = False
self . local . in_emit = False
class Stream ( object ) :
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
#: A :class:`Protocol` representing the protocol active on the stream.
protocol = None
#: In parents, the :class:`mitogen.parent.Connection` instance.
conn = None
name = u ' default '
def set_protocol ( self , protocol ) :
self . protocol = protocol
self . protocol . stream = self
def accept ( self , rfp , wfp ) :
self . receive_side = Side ( self , rfp )
self . transmit_side = Side ( self , wfp )
def __repr__ ( self ) :
return " <Stream %s > " % ( self . name , )
def on_receive ( self , broker ) :
"""
Called by : class : ` Broker ` when the stream ' s :attr:`receive_side` has
been marked readable using : meth : ` Broker . start_receive ` and the broker
has detected the associated file descriptor is ready for reading .
Subclasses must implement this if : meth : ` Broker . start_receive ` is ever
called on them , and the method must call : meth : ` on_disconect ` if
reading produces an empty string .
"""
buf = self . receive_side . read ( self . protocol . read_size )
if not buf :
LOG . debug ( ' %r : empty read, disconnecting ' , self )
return self . on_disconnect ( broker )
self . protocol . on_receive ( broker , buf )
def on_transmit ( self , broker ) :
"""
Called by : class : ` Broker ` when the stream ' s :attr:`transmit_side`
has been marked writeable using : meth : ` Broker . _start_transmit ` and
the broker has detected the associated file descriptor is ready for
writing .
Subclasses must implement this if : meth : ` Broker . _start_transmit ` is
ever called on them .
"""
self . protocol . on_transmit ( broker )
def on_shutdown ( self , broker ) :
"""
Called by : meth : ` Broker . shutdown ` to allow the stream time to
gracefully shutdown . The base implementation simply called
: meth : ` on_disconnect ` .
"""
fire ( self , ' shutdown ' )
self . protocol . on_shutdown ( broker )
def on_disconnect ( self , broker ) :
"""
Called by : class : ` Broker ` to force disconnect the stream . The base
implementation simply closes : attr : ` receive_side ` and
: attr : ` transmit_side ` and unregisters the stream from the broker .
"""
fire ( self , ' disconnect ' )
self . protocol . on_disconnect ( broker )
class Protocol ( object ) :
class Protocol ( object ) :
"""
"""
Implement the program behaviour associated with activity on a
Implement the program behaviour associated with activity on a
@ -1506,11 +1582,13 @@ class Protocol(object):
provided by : class : ` Stream ` and : class : ` Side ` , allowing the underlying IO
provided by : class : ` Stream ` and : class : ` Side ` , allowing the underlying IO
implementation to be replaced without modifying behavioural logic .
implementation to be replaced without modifying behavioural logic .
"""
"""
stream_class = Stream
stream = None
stream = None
read_size = CHUNK_SIZE
@classmethod
@classmethod
def build_stream ( cls , * args , * * kwargs ) :
def build_stream ( cls , * args , * * kwargs ) :
stream = Stream ( )
stream = cls . stream_class ( )
stream . set_protocol ( cls ( * args , * * kwargs ) )
stream . set_protocol ( cls ( * args , * * kwargs ) )
return stream
return stream
@ -1547,24 +1625,30 @@ class DelimitedProtocol(Protocol):
increasingly complete message . When a complete message is finally received ,
increasingly complete message . When a complete message is finally received ,
: meth : ` on_line_received ` will be called once for it before the buffer is
: meth : ` on_line_received ` will be called once for it before the buffer is
discarded .
discarded .
If : func : ` on_line_received ` returns : data : ` False ` , remaining data is passed
unprocessed to the stream ' s current protocol ' s : meth : ` on_receive ` . This
allows switching from line - oriented to binary while the input buffer
contains both kinds of data .
"""
"""
#: The delimiter. Defaults to newline.
#: The delimiter. Defaults to newline.
delimiter = b ( ' \n ' )
delimiter = b ( ' \n ' )
_trailer = b ( ' ' )
_trailer = b ( ' ' )
def on_receive ( self , broker ):
def on_receive ( self , broker , buf ):
IOLOG . debug ( ' %r .on_receive() ' , self )
IOLOG . debug ( ' %r .on_receive() ' , self )
buf = self . stream . receive_side . read ( )
self . _trailer , cont = mitogen . core . iter_split (
if not buf :
return self . stream . on_disconnect ( broker )
self . _trailer = mitogen . core . iter_split (
buf = self . _trailer + buf ,
buf = self . _trailer + buf ,
delim = self . delimiter ,
delim = self . delimiter ,
func = self . on_line_received ,
func = self . on_line_received ,
)
)
if self . _trailer :
if self . _trailer :
if cont :
self . on_partial_line_received ( self . _trailer )
self . on_partial_line_received ( self . _trailer )
else :
assert self . stream . protocol is not self
self . stream . protocol . on_receive ( broker , self . _trailer )
def on_line_received ( self , line ) :
def on_line_received ( self , line ) :
pass
pass
@ -1656,23 +1740,24 @@ class Side(object):
enabled using : func : ` fcntl . fcntl ` .
enabled using : func : ` fcntl . fcntl ` .
"""
"""
_fork_refs = weakref . WeakValueDictionary ( )
_fork_refs = weakref . WeakValueDictionary ( )
closed = False
def __init__ ( self , stream , f d , cloexec = True , keep_alive = True , blocking = False ) :
def __init__ ( self , stream , f p , cloexec = True , keep_alive = True , blocking = False ) :
#: The :class:`Stream` for which this is a read or write side.
#: The :class:`Stream` for which this is a read or write side.
self . stream = stream
self . stream = stream
#: Integer file descriptor to perform IO on, or :data:`None` if
#: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called.
#: :meth:`close` has been called.
self . f d = fd
self . f p = fp
self . closed = False
self . fd = fp . fileno ( )
#: If :data:`True`, causes presence of this side in
#: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the
#: :class:`Broker`'s active reader set to defer shutdown until the
#: side is disconnected.
#: side is disconnected.
self . keep_alive = keep_alive
self . keep_alive = keep_alive
self . _fork_refs [ id ( self ) ] = self
self . _fork_refs [ id ( self ) ] = self
if cloexec :
if cloexec :
set_cloexec ( fd )
set_cloexec ( self . fd )
if not blocking :
if not blocking :
set_nonblock ( fd )
set_nonblock ( self . fd )
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' <Side of %r fd %s > ' % ( self . stream , self . fd )
return ' <Side of %r fd %s > ' % ( self . stream , self . fd )
@ -1692,7 +1777,7 @@ class Side(object):
_vv and IOLOG . debug ( ' %r .close() ' , self )
_vv and IOLOG . debug ( ' %r .close() ' , self )
if not self . closed :
if not self . closed :
self . closed = True
self . closed = True
os . close ( self . fd )
self . fp . close ( )
def read ( self , n = CHUNK_SIZE ) :
def read ( self , n = CHUNK_SIZE ) :
"""
"""
@ -1728,9 +1813,8 @@ class Side(object):
Number of bytes written , or : data : ` None ` if disconnection was
Number of bytes written , or : data : ` None ` if disconnection was
detected .
detected .
"""
"""
if self . closed or self . fd is None :
if self . closed :
# Refuse to touch the handle after closed, it may have been reused
# Don't touch the handle after close, it may be reused elsewhere.
# by another thread.
return None
return None
written , disconnected = io_op ( os . write , self . fd , s )
written , disconnected = io_op ( os . write , self . fd , s )
@ -1740,67 +1824,10 @@ class Side(object):
return written
return written
class BasicStream ( object ) :
class MitogenProtocol ( Protocol ) :
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
def on_receive ( self , broker ) :
"""
Called by : class : ` Broker ` when the stream ' s :attr:`receive_side` has
been marked readable using : meth : ` Broker . start_receive ` and the broker
has detected the associated file descriptor is ready for reading .
Subclasses must implement this if : meth : ` Broker . start_receive ` is ever
called on them , and the method must call : meth : ` on_disconect ` if
reading produces an empty string .
"""
pass
def on_transmit ( self , broker ) :
"""
Called by : class : ` Broker ` when the stream ' s :attr:`transmit_side`
has been marked writeable using : meth : ` Broker . _start_transmit ` and
the broker has detected the associated file descriptor is ready for
writing .
Subclasses must implement this if : meth : ` Broker . _start_transmit ` is
ever called on them .
"""
pass
def on_shutdown ( self , broker ) :
"""
"""
Called by : meth : ` Broker . shutdown ` to allow the stream time to
: class : ` Protocol ` implementing mitogen ' s :ref:`stream protocol
gracefully shutdown . The base implementation simply called
< stream - protocol > ` .
: meth : ` on_disconnect ` .
"""
_v and LOG . debug ( ' %r .on_shutdown() ' , self )
fire ( self , ' shutdown ' )
self . on_disconnect ( broker )
def on_disconnect ( self , broker ) :
"""
Called by : class : ` Broker ` to force disconnect the stream . The base
implementation simply closes : attr : ` receive_side ` and
: attr : ` transmit_side ` and unregisters the stream from the broker .
"""
LOG . debug ( ' %r .on_disconnect() ' , self )
if self . receive_side :
broker . stop_receive ( self )
self . receive_side . close ( )
if self . transmit_side :
broker . _stop_transmit ( self )
self . transmit_side . close ( )
fire ( self , ' disconnect ' )
class Stream ( BasicStream ) :
"""
: class : ` BasicStream ` subclass implementing mitogen ' s :ref:`stream
protocol < stream - protocol > ` .
"""
"""
#: If not :data:`None`, :class:`Router` stamps this into
#: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream.
#: :attr:`Message.auth_id` of every message received on this stream.
@ -1811,24 +1838,24 @@ class Stream(BasicStream):
#: :data:`mitogen.parent_ids`.
#: :data:`mitogen.parent_ids`.
is_privileged = False
is_privileged = False
def __init__ ( self , router , remote_id , * * kwargs ):
def __init__ ( self , router , remote_id ):
self . _router = router
self . _router = router
self . remote_id = remote_id
self . remote_id = remote_id
self . name = u ' default '
self . sent_modules = set ( [ ' mitogen ' , ' mitogen.core ' ] )
self . sent_modules = set ( [ ' mitogen ' , ' mitogen.core ' ] )
self . construct ( * * kwargs )
self . _input_buf = collections . deque ( )
self . _input_buf = collections . deque ( )
self . _output_buf = collections . deque ( )
self . _input_buf_len = 0
self . _input_buf_len = 0
self . _output_buf_len = 0
self . _writer = BufferedWriter ( router . broker , self )
#: Routing records the dst_id of every message arriving from this
#: Routing records the dst_id of every message arriving from this
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
self . egress_ids = set ( )
self . egress_ids = set ( )
def construct ( self ) :
def on_receive ( self , broker , buf ) :
pass
"""
Handle the next complete message on the stream . Raise
def _internal_receive ( self , broker , buf ) :
: class : ` StreamError ` on failure .
"""
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
if self . _input_buf and self . _input_buf_len < 128 :
if self . _input_buf and self . _input_buf_len < 128 :
self . _input_buf [ 0 ] + = buf
self . _input_buf [ 0 ] + = buf
else :
else :
@ -1838,60 +1865,45 @@ class Stream(BasicStream):
while self . _receive_one ( broker ) :
while self . _receive_one ( broker ) :
pass
pass
def on_receive ( self , broker ) :
""" Handle the next complete message on the stream. Raise
: class : ` StreamError ` on failure . """
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
buf = self . receive_side . read ( )
if not buf :
return self . on_disconnect ( broker )
self . _internal_receive ( broker , buf )
HEADER_FMT = ' >hLLLLLL '
HEADER_LEN = struct . calcsize ( HEADER_FMT )
HEADER_MAGIC = 0x4d49 # 'MI'
corrupt_msg = (
corrupt_msg = (
' Corruption detected: frame signature incorrect. This likely means '
' %s : Corruption detected: frame signature incorrect. This likely means '
' some external process is interfering with the connection. Received: '
' some external process is interfering with the connection. Received: '
' \n \n '
' \n \n '
' %r '
' %r '
)
)
def _receive_one ( self , broker ) :
def _receive_one ( self , broker ) :
if self . _input_buf_len < self . HEADER_LEN :
if self . _input_buf_len < Message . HEADER_LEN :
return False
return False
msg = Message ( )
msg = Message ( )
msg . router = self . _router
msg . router = self . _router
( magic , msg . dst_id , msg . src_id , msg . auth_id ,
( magic , msg . dst_id , msg . src_id , msg . auth_id ,
msg . handle , msg . reply_to , msg_len ) = struct . unpack (
msg . handle , msg . reply_to , msg_len ) = struct . unpack (
self . HEADER_FMT ,
Message . HEADER_FMT ,
self . _input_buf [ 0 ] [ : self . HEADER_LEN ] ,
self . _input_buf [ 0 ] [ : Message . HEADER_LEN ] ,
)
)
if magic != self . HEADER_MAGIC :
if magic != Message . HEADER_MAGIC :
LOG . error ( self . corrupt_msg , self . _input_buf[ 0 ] [ : 2048 ] )
LOG . error ( self . corrupt_msg , self . stream. name , self . _input_buf[ 0 ] [ : 2048 ] )
self . on_disconnect( broker )
self . stream. on_disconnect( broker )
return False
return False
if msg_len > self . _router . max_message_size :
if msg_len > self . _router . max_message_size :
LOG . error ( ' Maximum message size exceeded (got %d , max %d ) ' ,
LOG . error ( ' Maximum message size exceeded (got %d , max %d ) ' ,
msg_len , self . _router . max_message_size )
msg_len , self . _router . max_message_size )
self . on_disconnect( broker )
self . stream. on_disconnect( broker )
return False
return False
total_len = msg_len + self . HEADER_LEN
total_len = msg_len + Message . HEADER_LEN
if self . _input_buf_len < total_len :
if self . _input_buf_len < total_len :
_vv and IOLOG . debug (
_vv and IOLOG . debug (
' %r : Input too short (want %d , got %d ) ' ,
' %r : Input too short (want %d , got %d ) ' ,
self , msg_len , self . _input_buf_len - self . HEADER_LEN
self , msg_len , self . _input_buf_len - Message . HEADER_LEN
)
)
return False
return False
start = self . HEADER_LEN
start = Message . HEADER_LEN
prev_start = start
prev_start = start
remain = total_len
remain = total_len
bits = [ ]
bits = [ ]
@ -1906,7 +1918,7 @@ class Stream(BasicStream):
msg . data = b ( ' ' ) . join ( bits )
msg . data = b ( ' ' ) . join ( bits )
self . _input_buf . appendleft ( buf [ prev_start + len ( bit ) : ] )
self . _input_buf . appendleft ( buf [ prev_start + len ( bit ) : ] )
self . _input_buf_len - = total_len
self . _input_buf_len - = total_len
self . _router . _async_route ( msg , self )
self . _router . _async_route ( msg , self . stream )
return True
return True
def pending_bytes ( self ) :
def pending_bytes ( self ) :
@ -1918,50 +1930,16 @@ class Stream(BasicStream):
For an accurate result , this method should be called from the Broker
For an accurate result , this method should be called from the Broker
thread , for example by using : meth : ` Broker . defer_sync ` .
thread , for example by using : meth : ` Broker . defer_sync ` .
"""
"""
return self . _ output_buf _len
return self . _ writer. _len
def on_transmit ( self , broker ) :
def on_transmit ( self , broker ) :
""" Transmit buffered messages. """
""" Transmit buffered messages. """
_vv and IOLOG . debug ( ' %r .on_transmit() ' , self )
_vv and IOLOG . debug ( ' %r .on_transmit() ' , self )
self . _writer . on_transmit ( broker )
if self . _output_buf :
buf = self . _output_buf . popleft ( )
written = self . transmit_side . write ( buf )
if not written :
_v and LOG . debug ( ' %r .on_transmit(): disconnection detected ' , self )
self . on_disconnect ( broker )
return
elif written != len ( buf ) :
self . _output_buf . appendleft ( BufferType ( buf , written ) )
_vv and IOLOG . debug ( ' %r .on_transmit() -> len %d ' , self , written )
self . _output_buf_len - = written
if not self . _output_buf :
broker . _stop_transmit ( self )
def _send ( self , msg ) :
def _send ( self , msg ) :
_vv and IOLOG . debug ( ' %r ._send( %r ) ' , self , msg )
_vv and IOLOG . debug ( ' %r ._send( %r ) ' , self , msg )
pkt = struct . pack ( self . HEADER_FMT , self . HEADER_MAGIC , msg . dst_id ,
self . _writer . write ( msg . pack ( ) )
msg . src_id , msg . auth_id , msg . handle ,
msg . reply_to or 0 , len ( msg . data ) ) + msg . data
if not self . _output_buf_len :
# Modifying epoll/Kqueue state is expensive, as are needless broker
# loops. Rather than wait for writeability, just write immediately,
# and fall back to the broker loop on error or full buffer.
try :
n = self . transmit_side . write ( pkt )
if n :
if n == len ( pkt ) :
return
pkt = pkt [ n : ]
except OSError :
pass
self . _router . broker . _start_transmit ( self )
self . _output_buf . append ( pkt )
self . _output_buf_len + = len ( pkt )
def send ( self , msg ) :
def send ( self , msg ) :
""" Send `data` to `handle`, and tell the broker we have output. May
""" Send `data` to `handle`, and tell the broker we have output. May
@ -1969,17 +1947,8 @@ class Stream(BasicStream):
self . _router . broker . defer ( self . _send , msg )
self . _router . broker . defer ( self . _send , msg )
def on_shutdown ( self , broker ) :
def on_shutdown ( self , broker ) :
""" Override BasicStream behaviour of immediately disconnecting. """
""" Disable :class:`Protocol` immediate disconnect behaviour. """
_v and LOG . debug ( ' %r .on_shutdown( %r ) ' , self , broker )
_v and LOG . debug ( ' %r : shutting down ' , self )
def accept ( self , rfd , wfd ) :
# TODO: what is this os.dup for?
self . receive_side = Side ( self , os . dup ( rfd ) )
self . transmit_side = Side ( self , os . dup ( wfd ) )
def __repr__ ( self ) :
cls = type ( self )
return " %s . %s ( ' %s ' ) " % ( cls . __module__ , cls . __name__ , self . name )
class Context ( object ) :
class Context ( object ) :
@ -2005,18 +1974,17 @@ class Context(object):
: param str name :
: param str name :
Context name .
Context name .
"""
"""
name = None
remote_name = None
remote_name = None
def __init__ ( self , router , context_id , name = None ) :
def __init__ ( self , router , context_id , name = None ) :
self . router = router
self . router = router
self . context_id = context_id
self . context_id = context_id
self . name = name
if name :
self . name = to_text ( name )
def __reduce__ ( self ) :
def __reduce__ ( self ) :
name = self . name
return _unpickle_context , ( self . context_id , self . name )
if name and not isinstance ( name , UnicodeType ) :
name = UnicodeType ( name , ' utf-8 ' )
return _unpickle_context , ( self . context_id , name )
def on_disconnect ( self ) :
def on_disconnect ( self ) :
_v and LOG . debug ( ' %r : disconnecting ' , self )
_v and LOG . debug ( ' %r : disconnecting ' , self )
@ -2161,7 +2129,7 @@ class Poller(object):
self . _wfds = { }
self . _wfds = { }
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' %s (%#x ) ' % ( type ( self ) . __name__ , id ( self ) )
return ' %s ' % ( type ( self ) . __name__ , )
def _update ( self , fd ) :
def _update ( self , fd ) :
"""
"""
@ -2509,7 +2477,7 @@ class Latch(object):
)
)
class Waker ( BasicStream ) :
class Waker ( Protocol ) :
"""
"""
: class : ` BasicStream ` subclass implementing the ` UNIX self - pipe trick ` _ .
: class : ` BasicStream ` subclass implementing the ` UNIX self - pipe trick ` _ .
Used to wake the multiplexer when another thread needs to modify its state
Used to wake the multiplexer when another thread needs to modify its state
@ -2517,17 +2485,20 @@ class Waker(BasicStream):
. . _UNIX self - pipe trick : https : / / cr . yp . to / docs / selfpipe . html
. . _UNIX self - pipe trick : https : / / cr . yp . to / docs / selfpipe . html
"""
"""
read_size = 1
broker_ident = None
broker_ident = None
@classmethod
def build_stream ( cls , broker ) :
stream = super ( Waker , cls ) . build_stream ( broker )
stream . accept ( * pipe ( ) )
return stream
def __init__ ( self , broker ) :
def __init__ ( self , broker ) :
self . _broker = broker
self . _broker = broker
self . _lock = threading . Lock ( )
self . _lock = threading . Lock ( )
self . _deferred = [ ]
self . _deferred = [ ]
rfd , wfd = os . pipe ( )
self . receive_side = Side ( self , rfd )
self . transmit_side = Side ( self , wfd )
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' Waker(fd= %r / %r ) ' % (
return ' Waker(fd= %r / %r ) ' % (
self . stream . receive_side and self . stream . receive_side . fd ,
self . stream . receive_side and self . stream . receive_side . fd ,
@ -2545,7 +2516,7 @@ class Waker(BasicStream):
finally :
finally :
self . _lock . release ( )
self . _lock . release ( )
def on_receive ( self , broker ):
def on_receive ( self , broker , buf ):
"""
"""
Drain the pipe and fire callbacks . Since : attr : ` _deferred ` is
Drain the pipe and fire callbacks . Since : attr : ` _deferred ` is
synchronized , : meth : ` defer ` and : meth : ` on_receive ` can conspire to
synchronized , : meth : ` defer ` and : meth : ` on_receive ` can conspire to
@ -2554,7 +2525,6 @@ class Waker(BasicStream):
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
self . _lock . acquire ( )
self . _lock . acquire ( )
try :
try :
self . receive_side . read ( 1 )
deferred = self . _deferred
deferred = self . _deferred
self . _deferred = [ ]
self . _deferred = [ ]
finally :
finally :
@ -2566,7 +2536,7 @@ class Waker(BasicStream):
except Exception :
except Exception :
LOG . exception ( ' defer() crashed: %r (* %r , ** %r ) ' ,
LOG . exception ( ' defer() crashed: %r (* %r , ** %r ) ' ,
func , args , kwargs )
func , args , kwargs )
self . _ broker. shutdown ( )
broker. shutdown ( )
def _wake ( self ) :
def _wake ( self ) :
"""
"""
@ -2574,7 +2544,7 @@ class Waker(BasicStream):
teardown , the FD may already be closed , so ignore EBADF .
teardown , the FD may already be closed , so ignore EBADF .
"""
"""
try :
try :
self . transmit_side. write ( b ( ' ' ) )
self . stream. transmit_side. write ( b ( ' ' ) )
except OSError :
except OSError :
e = sys . exc_info ( ) [ 1 ]
e = sys . exc_info ( ) [ 1 ]
if e . args [ 0 ] != errno . EBADF :
if e . args [ 0 ] != errno . EBADF :
@ -2601,7 +2571,8 @@ class Waker(BasicStream):
if self . _broker . _exitted :
if self . _broker . _exitted :
raise Error ( self . broker_shutdown_msg )
raise Error ( self . broker_shutdown_msg )
_vv and IOLOG . debug ( ' %r .defer() [fd= %r ] ' , self , self . transmit_side . fd )
_vv and IOLOG . debug ( ' %r .defer() [fd= %r ] ' , self ,
self . stream . transmit_side . fd )
self . _lock . acquire ( )
self . _lock . acquire ( )
try :
try :
if not self . _deferred :
if not self . _deferred :
@ -2611,53 +2582,45 @@ class Waker(BasicStream):
self . _lock . release ( )
self . _lock . release ( )
class IoLogger ( BasicStream ) :
class IoLogger Protocol( DelimitedProtocol ) :
"""
"""
: class : ` BasicStream ` subclass that sets up redirection of a standard
Handle redirection of standard IO into the : mod : ` logging ` package .
UNIX file descriptor back into the Python : mod : ` logging ` package .
"""
"""
_trailer = u ' '
@classmethod
def build_stream ( cls , name , dest_fd ) :
def __init__ ( self , broker , name , dest_fd ) :
"""
self . _broker = broker
Even though the descriptor ` dest_fd ` will hold the opposite end of the
self . _name = name
socket open , we must keep a separate dup ( ) of it ( i . e . wsock ) in case
self . _rsock , self . _wsock = socket . socketpair ( )
some code decides to overwrite ` dest_fd ` later , which would thus break
os . dup2 ( self . _wsock . fileno ( ) , dest_fd )
: meth : ` on_shutdown ` .
set_cloexec ( self . _wsock . fileno ( ) )
"""
rsock , wsock = socket . socketpair ( )
os . dup2 ( wsock . fileno ( ) , dest_fd )
stream = super ( IoLoggerProtocol , cls ) . build_stream ( name )
stream . name = name
stream . accept ( rsock , wsock )
return stream
def __init__ ( self , name ) :
self . _log = logging . getLogger ( name )
self . _log = logging . getLogger ( name )
# #453: prevent accidental log initialization in a child creating a
# #453: prevent accidental log initialization in a child creating a
# feedback loop.
# feedback loop.
self . _log . propagate = False
self . _log . propagate = False
self . _log . handlers = logging . getLogger ( ) . handlers [ : ]
self . _log . handlers = logging . getLogger ( ) . handlers [ : ]
self . receive_side = Side ( self , self . _rsock . fileno ( ) )
self . transmit_side = Side ( self , dest_fd , cloexec = False , blocking = True )
self . _broker . start_receive ( self )
def __repr__ ( self ) :
return ' <IoLogger %s > ' % ( self . _name , )
def on_shutdown ( self , broker ) :
def on_shutdown ( self , broker ) :
""" Shut down the write end of the logging socket. """
""" Shut down the write end of the logging socket. """
_v and LOG . debug ( ' %r : shutting down ' , self )
_v and LOG . debug ( ' %r : shutting down ' , self )
if not IS_WSL :
if not IS_WSL :
# #333: WSL generates invalid readiness indication on shutdown()
# #333: WSL generates invalid readiness indication on shutdown().
self . _wsock . shutdown ( socket . SHUT_WR )
# This modifies the *kernel object* inherited by children, causing
self . _wsock . close ( )
# EPIPE on subsequent writes to any dupped FD in any process. The
self . transmit_side . close ( )
# read side can then drain completely of prior buffered data.
self . stream . transmit_side . fp . shutdown ( socket . SHUT_WR )
def on_receive ( self , broker ) :
self . stream . transmit_side . close ( )
_vv and IOLOG . debug ( ' %r .on_receive() ' , self )
buf = self . receive_side . read ( )
if not buf :
return self . on_disconnect ( broker )
self . _trailer = iter_split (
def on_line_received ( self , line ) :
buf = self . _trailer + buf . decode ( ' latin1 ' ) ,
self . _log . info ( ' %s ' , line . decode ( ' utf-8 ' , ' replace ' ) )
delim = ' \n ' ,
func = lambda s : self . _log . info ( ' %s ' , s )
)
class Router ( object ) :
class Router ( object ) :
@ -3008,11 +2971,11 @@ class Router(object):
self , msg . src_id , in_stream , expect , msg )
self , msg . src_id , in_stream , expect , msg )
return
return
if in_stream . auth_id is not None :
if in_stream . protocol. auth_id is not None :
msg . auth_id = in_stream . auth_id
msg . auth_id = in_stream . protocol. auth_id
# Maintain a set of IDs the source ever communicated with.
# Maintain a set of IDs the source ever communicated with.
in_stream . egress_ids. add ( msg . dst_id )
in_stream . protocol. egress_ids. add ( msg . dst_id )
if msg . dst_id == mitogen . context_id :
if msg . dst_id == mitogen . context_id :
return self . _invoke ( msg , in_stream )
return self . _invoke ( msg , in_stream )
@ -3027,12 +2990,13 @@ class Router(object):
return
return
if in_stream and self . unidirectional and not \
if in_stream and self . unidirectional and not \
( in_stream . is_privileged or out_stream . is_privileged ) :
( in_stream . protocol . is_privileged or
out_stream . protocol . is_privileged ) :
self . _maybe_send_dead ( msg , self . unidirectional_msg ,
self . _maybe_send_dead ( msg , self . unidirectional_msg ,
in_stream . remote_id, out_stream . remote_id )
in_stream . protocol. remote_id, out_stream . protocol . remote_id )
return
return
out_stream . _send( msg )
out_stream . protocol. _send( msg )
def route ( self , msg ) :
def route ( self , msg ) :
"""
"""
@ -3074,11 +3038,11 @@ class Broker(object):
def __init__ ( self , poller_class = None , activate_compat = True ) :
def __init__ ( self , poller_class = None , activate_compat = True ) :
self . _alive = True
self . _alive = True
self . _exitted = False
self . _exitted = False
self . _waker = Waker ( self )
self . _waker = Waker . build_stream ( self )
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread.
#: thread, or immediately if the current thread is the broker thread.
#: Safe to call from any thread.
#: Safe to call from any thread.
self . defer = self . _waker . defer
self . defer = self . _waker . protocol. defer
self . poller = self . poller_class ( )
self . poller = self . poller_class ( )
self . poller . start_receive (
self . poller . start_receive (
self . _waker . receive_side . fd ,
self . _waker . receive_side . fd ,
@ -3112,7 +3076,7 @@ class Broker(object):
"""
"""
_vv and IOLOG . debug ( ' %r .start_receive( %r ) ' , self , stream )
_vv and IOLOG . debug ( ' %r .start_receive( %r ) ' , self , stream )
side = stream . receive_side
side = stream . receive_side
assert side and side . fd is not None
assert side and not side . closed
self . defer ( self . poller . start_receive ,
self . defer ( self . poller . start_receive ,
side . fd , ( side , stream . on_receive ) )
side . fd , ( side , stream . on_receive ) )
@ -3133,7 +3097,7 @@ class Broker(object):
"""
"""
_vv and IOLOG . debug ( ' %r ._start_transmit( %r ) ' , self , stream )
_vv and IOLOG . debug ( ' %r ._start_transmit( %r ) ' , self , stream )
side = stream . transmit_side
side = stream . transmit_side
assert side and side . fd is not None
assert side and not side . closed
self . poller . start_transmit ( side . fd , ( side , stream . on_transmit ) )
self . poller . start_transmit ( side . fd , ( side , stream . on_transmit ) )
def _stop_transmit ( self , stream ) :
def _stop_transmit ( self , stream ) :
@ -3246,7 +3210,7 @@ class Broker(object):
: meth : ` shutdown ` is called .
: meth : ` shutdown ` is called .
"""
"""
# For Python 2.4, no way to retrieve ident except on thread.
# For Python 2.4, no way to retrieve ident except on thread.
self . _waker . broker_ident = thread . get_ident ( )
self . _waker . protocol. broker_ident = thread . get_ident ( )
try :
try :
while self . _alive :
while self . _alive :
self . _loop_once ( )
self . _loop_once ( )
@ -3486,18 +3450,16 @@ class ExternalContext(object):
else :
else :
self . parent = Context ( self . router , parent_id , ' parent ' )
self . parent = Context ( self . router , parent_id , ' parent ' )
in_fd = self . config . get ( ' in_fd ' , 100 )
in_fp = os . fdopen ( os . dup ( self . config . get ( ' in_fd ' , 100 ) ) , ' rb ' , 0 )
out_fd = self . config . get ( ' out_fd ' , 1 )
out_fp = os . fdopen ( os . dup ( self . config . get ( ' out_fd ' , 1 ) ) , ' wb ' , 0 )
self . stream = Stream ( self . router , parent_id )
self . stream = MitogenProtocol . build_stream ( self . router , parent_id )
self . stream . accept ( in_fp , out_fp )
self . stream . name = ' parent '
self . stream . name = ' parent '
self . stream . accept ( in_fd , out_fd )
self . stream . receive_side . keep_alive = False
self . stream . receive_side . keep_alive = False
listen ( self . stream , ' disconnect ' , self . _on_parent_disconnect )
listen ( self . stream , ' disconnect ' , self . _on_parent_disconnect )
listen ( self . broker , ' exit ' , self . _on_broker_exit )
listen ( self . broker , ' exit ' , self . _on_broker_exit )
os . close ( in_fd )
def _reap_first_stage ( self ) :
def _reap_first_stage ( self ) :
try :
try :
os . wait ( ) # Reap first stage.
os . wait ( ) # Reap first stage.
@ -3584,7 +3546,7 @@ class ExternalContext(object):
try :
try :
if os . isatty ( 2 ) :
if os . isatty ( 2 ) :
self . reserve_tty_fp = os . fdopen ( os . dup ( 2 ) , ' r+b ' , 0 )
self . reserve_tty_fp = os . fdopen ( os . dup ( 2 ) , ' r+b ' , 0 )
set_cloexec ( self . reserve_tty_fp )
set_cloexec ( self . reserve_tty_fp . fileno ( ) )
except OSError :
except OSError :
pass
pass
@ -3600,8 +3562,12 @@ class ExternalContext(object):
sys . stdout . close ( )
sys . stdout . close ( )
self . _nullify_stdio ( )
self . _nullify_stdio ( )
self . stdout_log = IoLogger ( self . broker , ' stdout ' , 1 )
self . loggers = [ ]
self . stderr_log = IoLogger ( self . broker , ' stderr ' , 2 )
for name , fd in ( ( ' stdout ' , 1 ) , ( ' stderr ' , 2 ) ) :
log = IoLoggerProtocol . build_stream ( name , fd )
self . broker . start_receive ( log )
self . loggers . append ( log )
# Reopen with line buffering.
# Reopen with line buffering.
sys . stdout = os . fdopen ( 1 , ' w ' , 1 )
sys . stdout = os . fdopen ( 1 , ' w ' , 1 )
@ -3621,11 +3587,11 @@ class ExternalContext(object):
self . dispatcher = Dispatcher ( self )
self . dispatcher = Dispatcher ( self )
self . router . register ( self . parent , self . stream )
self . router . register ( self . parent , self . stream )
self . router . _setup_logging ( )
self . router . _setup_logging ( )
self . log_handler . uncork ( )
sys . executable = os . environ . pop ( ' ARGV0 ' , sys . executable )
sys . executable = os . environ . pop ( ' ARGV0 ' , sys . executable )
_v and LOG . debug ( ' Connected to context %s ; my ID is %r ' ,
_v and LOG . debug ( ' Parent is context %r ( %s ); my ID is %r ' ,
self . parent , mitogen . context_id )
self . parent . context_id , self . parent . name ,
mitogen . context_id )
_v and LOG . debug ( ' pid: %r ppid: %r uid: %r / %r , gid: %r / %r host: %r ' ,
_v and LOG . debug ( ' pid: %r ppid: %r uid: %r / %r , gid: %r / %r host: %r ' ,
os . getpid ( ) , os . getppid ( ) , os . geteuid ( ) ,
os . getpid ( ) , os . getppid ( ) , os . geteuid ( ) ,
os . getuid ( ) , os . getegid ( ) , os . getgid ( ) ,
os . getuid ( ) , os . getegid ( ) , os . getgid ( ) ,
@ -3633,6 +3599,9 @@ class ExternalContext(object):
_v and LOG . debug ( ' Recovered sys.executable: %r ' , sys . executable )
_v and LOG . debug ( ' Recovered sys.executable: %r ' , sys . executable )
self . broker . _py24_25_compat ( )
self . broker . _py24_25_compat ( )
if self . config . get ( ' send_ec2 ' , True ) :
self . stream . transmit_side . write ( b ( ' MITO002 \n ' ) )
self . log_handler . uncork ( )
self . dispatcher . run ( )
self . dispatcher . run ( )
_v and LOG . debug ( ' ExternalContext.main() normal exit ' )
_v and LOG . debug ( ' ExternalContext.main() normal exit ' )
except KeyboardInterrupt :
except KeyboardInterrupt :