@ -162,11 +162,27 @@ class Message(object):
reply_to = None
data = None
router = None
def __init__ ( self , * * kwargs ) :
self . src_id = mitogen . context_id
vars ( self ) . update ( kwargs )
_find_global = None
def _unpickle_context ( self , context_id , name ) :
return _unpickle_context ( self . router , context_id , name )
def _find_global ( self , module , func ) :
""" Return the class implementing `module_name.class_name` or raise
` StreamError ` if the module is not whitelisted . """
if module == __name__ :
if func == ' _unpickle_call_error ' :
return _unpickle_call_error
elif func == ' _unpickle_dead ' :
return _unpickle_dead
elif func == ' _unpickle_context ' :
return self . _unpickle_context
raise StreamError ( ' cannot unpickle %r / %r ' , module , func )
@classmethod
def pickled ( cls , obj , * * kwargs ) :
@ -182,7 +198,6 @@ class Message(object):
IOLOG . debug ( ' %r .unpickle() ' , self )
fp = cStringIO . StringIO ( self . data )
unpickler = cPickle . Unpickler ( fp )
if self . _find_global :
unpickler . find_global = self . _find_global
try :
return unpickler . load ( )
@ -521,7 +536,6 @@ class Stream(BasicStream):
"""
_input_buf = ' '
_output_buf = ' '
message_class = Message
def __init__ ( self , router , remote_id , key , * * kwargs ) :
self . _router = router
@ -556,7 +570,10 @@ class Stream(BasicStream):
if len ( self . _input_buf ) < self . HEADER_LEN :
return False
msg = self . message_class ( )
msg = Message ( )
# To support unpickling Contexts.
msg . router = self . _router
( msg . dst_id , msg . src_id ,
msg . handle , msg . reply_to , msg_len ) = struct . unpack (
self . HEADER_FMT ,
@ -628,6 +645,9 @@ class Context(object):
self . name = name
self . key = key or ( ' %016x ' % random . getrandbits ( 128 ) )
def __reduce__ ( self ) :
return _unpickle_context , ( self . context_id , self . name )
def on_disconnect ( self , broker ) :
LOG . debug ( ' Parent stream is gone, dying. ' )
fire ( self , ' disconnect ' )
@ -672,6 +692,13 @@ class Context(object):
return ' Context( %s , %r ) ' % ( self . context_id , self . name )
def _unpickle_context ( router , context_id , name ) :
assert isinstance ( router , Router )
assert isinstance ( context_id , ( int , long ) ) and context_id > 0
assert type ( name ) is str and len ( name ) < 100
return Context ( router , context_id , name )
class Waker ( BasicStream ) :
"""
: py : class : ` BasicStream ` subclass implementing the