@ -1001,7 +1001,7 @@ class DiagLogStream(mitogen.core.BasicStream):
self . buf = ' '
self . buf = ' '
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' mitogen.parent.DiagLogStream(fd= %r , %r ) ' % (
return " mitogen.parent.DiagLogStream(fd= %r , ' %s ' ) " % (
self . receive_side . fd ,
self . receive_side . fd ,
self . stream . name ,
self . stream . name ,
)
)
@ -1017,11 +1017,11 @@ class DiagLogStream(mitogen.core.BasicStream):
return self . on_disconnect ( broker )
return self . on_disconnect ( broker )
self . buf + = buf . decode ( ' utf-8 ' , ' replace ' )
self . buf + = buf . decode ( ' utf-8 ' , ' replace ' )
while ' \n ' in self . buf :
while u ' \n ' in self . buf :
lines = self . buf . split ( ' \n ' )
lines = self . buf . split ( ' \n ' )
self . buf = lines [ - 1 ]
self . buf = lines [ - 1 ]
for line in lines [ : - 1 ] :
for line in lines [ : - 1 ] :
LOG . debug ( ' % r: %r ' , self , line . rstrip ( ) )
LOG . debug ( ' % s: %s ' , self . stream . name , line . rstrip ( ) )
class Stream ( mitogen . core . Stream ) :
class Stream ( mitogen . core . Stream ) :
@ -1288,10 +1288,18 @@ class Stream(mitogen.core.Stream):
if self . eof_error_hint :
if self . eof_error_hint :
e . args = ( ' %s \n \n %s ' % ( e . args [ 0 ] , self . eof_error_hint ) , )
e . args = ( ' %s \n \n %s ' % ( e . args [ 0 ] , self . eof_error_hint ) , )
def _get_name ( self ) :
"""
Called by : meth : ` connect ` after : attr : ` pid ` is known . Subclasses can
override it to specify a default stream name , or set
: attr : ` name_prefix ` to generate a default format .
"""
return u ' %s . %s ' % ( self . name_prefix , self . pid )
def connect ( self ) :
def connect ( self ) :
LOG . debug ( ' %r .connect() ' , self )
LOG . debug ( ' %r .connect() ' , self )
self . pid , fd , diag_fd = self . start_child ( )
self . pid , fd , diag_fd = self . start_child ( )
self . name = u ' %s . %s ' % ( self . name_prefix , self . pid )
self . name = self . _get_name ( )
self . receive_side = mitogen . core . Side ( self , fd )
self . receive_side = mitogen . core . Side ( self , fd )
self . transmit_side = mitogen . core . Side ( self , os . dup ( fd ) )
self . transmit_side = mitogen . core . Side ( self , os . dup ( fd ) )
if diag_fd is not None :
if diag_fd is not None :
@ -1299,8 +1307,8 @@ class Stream(mitogen.core.Stream):
else :
else :
self . diag_stream = None
self . diag_stream = None
LOG . debug ( ' %r .connect(): stdin=%r , stdout= %r , diag= %r ' ,
LOG . debug ( ' %r .connect(): pid:%r stdin: %r , stdout: %r , diag: %r ' ,
self , self . receive_side. fd , self . transmit_side . fd ,
self , self . pid, self . receive_side. fd , self . transmit_side . fd ,
self . diag_stream and self . diag_stream . receive_side . fd )
self . diag_stream and self . diag_stream . receive_side . fd )
try :
try :
@ -1680,6 +1688,10 @@ class RouteMonitor(object):
child is beging upgraded in preparation to become a parent of children of
child is beging upgraded in preparation to become a parent of children of
its own .
its own .
By virtue of only being active while responding to messages from a handler ,
RouteMonitor lives entirely on the broker thread , so its data requires no
locking .
: param Router router :
: param Router router :
Router to install handlers on .
Router to install handlers on .
: param Context parent :
: param Context parent :
@ -1689,6 +1701,9 @@ class RouteMonitor(object):
def __init__ ( self , router , parent = None ) :
def __init__ ( self , router , parent = None ) :
self . router = router
self . router = router
self . parent = parent
self . parent = parent
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self . _routes_by_stream = { }
self . router . add_handler (
self . router . add_handler (
fn = self . _on_add_route ,
fn = self . _on_add_route ,
handle = mitogen . core . ADD_ROUTE ,
handle = mitogen . core . ADD_ROUTE ,
@ -1703,9 +1718,6 @@ class RouteMonitor(object):
policy = is_immediate_child ,
policy = is_immediate_child ,
overwrite = True ,
overwrite = True ,
)
)
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self . _routes_by_stream = { }
def __repr__ ( self ) :
def __repr__ ( self ) :
return ' RouteMonitor() '
return ' RouteMonitor() '
@ -1767,8 +1779,11 @@ class RouteMonitor(object):
: param int target_id :
: param int target_id :
ID of the connecting or disconnecting context .
ID of the connecting or disconnecting context .
"""
"""
for stream in itervalues ( self . router . _stream_by_id ) :
for stream in self . router . get_streams ( ) :
if target_id in stream . egress_ids :
if target_id in stream . egress_ids and (
( self . parent is None ) or
( self . parent . context_id != stream . remote_id )
) :
self . _send_one ( stream , mitogen . core . DEL_ROUTE , target_id , None )
self . _send_one ( stream , mitogen . core . DEL_ROUTE , target_id , None )
def notice_stream ( self , stream ) :
def notice_stream ( self , stream ) :
@ -1797,9 +1812,15 @@ class RouteMonitor(object):
def _on_stream_disconnect ( self , stream ) :
def _on_stream_disconnect ( self , stream ) :
"""
"""
Respond to disconnection of a local stream by
Respond to disconnection of a local stream by propagating DEL_ROUTE for
any contexts we know were attached to it .
"""
"""
routes = self . _routes_by_stream . pop ( stream )
# During a stream crash it is possible for disconnect signal to fire
# twice, in which case ignore the second instance.
routes = self . _routes_by_stream . pop ( stream , None )
if routes is None :
return
LOG . debug ( ' %r : %r is gone; propagating DEL_ROUTE for %r ' ,
LOG . debug ( ' %r : %r is gone; propagating DEL_ROUTE for %r ' ,
self , stream , routes )
self , stream , routes )
for target_id in routes :
for target_id in routes :
@ -1910,6 +1931,16 @@ class Router(mitogen.core.Router):
stream . detached = True
stream . detached = True
msg . reply ( None )
msg . reply ( None )
def get_streams ( self ) :
"""
Return a snapshot of all streams in existence at time of call .
"""
self . _write_lock . acquire ( )
try :
return itervalues ( self . _stream_by_id )
finally :
self . _write_lock . release ( )
def add_route ( self , target_id , stream ) :
def add_route ( self , target_id , stream ) :
"""
"""
Arrange for messages whose ` dst_id ` is ` target_id ` to be forwarded on
Arrange for messages whose ` dst_id ` is ` target_id ` to be forwarded on
@ -1921,11 +1952,12 @@ class Router(mitogen.core.Router):
LOG . debug ( ' %r .add_route( %r , %r ) ' , self , target_id , stream )
LOG . debug ( ' %r .add_route( %r , %r ) ' , self , target_id , stream )
assert isinstance ( target_id , int )
assert isinstance ( target_id , int )
assert isinstance ( stream , Stream )
assert isinstance ( stream , Stream )
self . _write_lock . acquire ( )
try :
try :
self . _stream_by_id [ target_id ] = stream
self . _stream_by_id [ target_id ] = stream
except KeyError :
finally :
LOG . error ( ' %r : cant add route to %r via %r : no such stream ' ,
self . _write_lock . release ( )
self , target_id , stream )
def del_route ( self , target_id ) :
def del_route ( self , target_id ) :
LOG . debug ( ' %r .del_route( %r ) ' , self , target_id )
LOG . debug ( ' %r .del_route( %r ) ' , self , target_id )
@ -1934,7 +1966,11 @@ class Router(mitogen.core.Router):
# 'disconnect' event on the appropriate Context instance. In that case,
# 'disconnect' event on the appropriate Context instance. In that case,
# we won't a matching _stream_by_id entry for the disappearing route,
# we won't a matching _stream_by_id entry for the disappearing route,
# so don't raise an error for a missing key here.
# so don't raise an error for a missing key here.
self . _stream_by_id . pop ( target_id , None )
self . _write_lock . acquire ( )
try :
self . _stream_by_id . pop ( target_id , None )
finally :
self . _write_lock . release ( )
def get_module_blacklist ( self ) :
def get_module_blacklist ( self ) :
if mitogen . context_id == 0 :
if mitogen . context_id == 0 :
@ -1993,7 +2029,11 @@ class Router(mitogen.core.Router):
name = u ' %s . %s ' % ( via_context . name , resp [ ' name ' ] )
name = u ' %s . %s ' % ( via_context . name , resp [ ' name ' ] )
context = self . context_class ( self , resp [ ' id ' ] , name = name )
context = self . context_class ( self , resp [ ' id ' ] , name = name )
context . via = via_context
context . via = via_context
self . _context_by_id [ context . context_id ] = context
self . _write_lock . acquire ( )
try :
self . _context_by_id [ context . context_id ] = context
finally :
self . _write_lock . release ( )
return context
return context
def doas ( self , * * kwargs ) :
def doas ( self , * * kwargs ) :