@ -638,10 +638,10 @@ def _upgrade_broker(broker):
# This function is deadly! The act of calling start_receive() generates log
# messages which must be silenced as the upgrade progresses, otherwise the
# poller state will change as it is copied, resulting in write fds that are
# lost. (Due to LogHandler->Router->Stream-> Broker->Poller, where Stream
# only calls start_transmit() when transitioning from empty to non-empty
# buffer. If the start_transmit() is lost, writes from the child hang
# permanently).
# lost. (Due to LogHandler->Router->Stream-> Protocol-> Broker->Poller, where
# Stream only calls start_transmit() when transitioning from empty to
# non-empty buffer. If the start_transmit() is lost, writes from the child
# hang permanently).
root = logging . getLogger ( )
old_level = root . level
root . setLevel ( logging . CRITICAL )
@ -810,7 +810,8 @@ class CallSpec(object):
class PollPoller ( mitogen . core . Poller ) :
"""
Poller based on the POSIX poll ( 2 ) interface . Not available on some versions
of OS X , otherwise it is the preferred poller for small FD counts .
of OS X , otherwise it is the preferred poller for small FD counts , as there
is no setup / teardown / configuration system call overhead .
"""
SUPPORTED = hasattr ( select , ' poll ' )
_repr = ' PollPoller() '
@ -1106,8 +1107,8 @@ class BootstrapProtocol(RegexProtocol):
"""
Respond to stdout of a child during bootstrap . Wait for EC0_MARKER to be
written by the first stage to indicate it can receive the bootstrap , then
await EC1_MARKER to indicate success , and
: class : ` mitogen . core . MitogenProtocol ` can be enabled .
await EC1_MARKER to indicate success , and : class : ` MitogenProtocol ` can be
enabled .
"""
#: Sentinel value emitted by the first stage to indicate it is ready to
#: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
@ -1161,6 +1162,26 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
LOG . info ( u ' %s : %s ' , self . stream . name , line . decode ( ' utf-8 ' , ' replace ' ) )
class MitogenProtocol ( mitogen . core . MitogenProtocol ) :
"""
Extend core . MitogenProtocol to cause SHUTDOWN to be sent to the child
during graceful shutdown .
"""
def on_shutdown ( self , broker ) :
"""
Respond to the broker ' s request for the stream to shut down by sending
SHUTDOWN to the child .
"""
LOG . debug ( ' %r : requesting child shutdown ' , self )
self . _send (
mitogen . core . Message (
src_id = mitogen . context_id ,
dst_id = self . remote_id ,
handle = mitogen . core . SHUTDOWN ,
)
)
class Options ( object ) :
name = None
@ -1407,7 +1428,7 @@ class Connection(object):
if not self . exception :
self . _router . register ( self . context , self . stdio_stream )
self . stdio_stream . set_protocol (
mitogen. core . MitogenProtocol(
MitogenProtocol(
router = self . _router ,
remote_id = self . context . context_id ,
)
@ -1428,19 +1449,6 @@ class Connection(object):
stream . on_disconnect ( self . _router . broker )
self . _complete_connection ( )
def on_stream_shutdown ( self ) :
"""
Request the slave gracefully shut itself down .
"""
LOG . debug ( ' %r : requesting child shutdown ' , self )
self . stdio_stream . protocol . _send (
mitogen . core . Message (
src_id = mitogen . context_id ,
dst_id = self . stdio_stream . protocol . remote_id ,
handle = mitogen . core . SHUTDOWN ,
)
)
eof_error_msg = ' EOF on stream; last 100 lines received: \n '
def on_stdio_disconnect ( self ) :
@ -1511,7 +1519,6 @@ class Connection(object):
stream . name = self . options . name or self . _get_name ( )
stream . accept ( self . proc . stdout , self . proc . stdin )
mitogen . core . listen ( stream , ' shutdown ' , self . on_stream_shutdown )
mitogen . core . listen ( stream , ' disconnect ' , self . on_stdio_disconnect )
self . _router . broker . start_receive ( stream )
return stream