|
|
@ -1937,18 +1937,28 @@ class MitogenProtocol(Protocol):
|
|
|
|
:class:`Protocol` implementing mitogen's :ref:`stream protocol
|
|
|
|
:class:`Protocol` implementing mitogen's :ref:`stream protocol
|
|
|
|
<stream-protocol>`.
|
|
|
|
<stream-protocol>`.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
#: If not :data:`None`, :class:`Router` stamps this into
|
|
|
|
|
|
|
|
#: :attr:`Message.auth_id` of every message received on this stream.
|
|
|
|
|
|
|
|
auth_id = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
|
|
|
|
#: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
|
|
|
|
#: its value is the same as :data:`mitogen.context_id` or appears in
|
|
|
|
#: its value is the same as :data:`mitogen.context_id` or appears in
|
|
|
|
#: :data:`mitogen.parent_ids`.
|
|
|
|
#: :data:`mitogen.parent_ids`.
|
|
|
|
is_privileged = False
|
|
|
|
is_privileged = False
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, router, remote_id):
|
|
|
|
def __init__(self, router, remote_id, auth_id=None,
|
|
|
|
|
|
|
|
local_id=None, parent_ids=None):
|
|
|
|
self._router = router
|
|
|
|
self._router = router
|
|
|
|
self.remote_id = remote_id
|
|
|
|
self.remote_id = remote_id
|
|
|
|
|
|
|
|
#: If not :data:`None`, :class:`Router` stamps this into
|
|
|
|
|
|
|
|
#: :attr:`Message.auth_id` of every message received on this stream.
|
|
|
|
|
|
|
|
self.auth_id = auth_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if parent_ids is None:
|
|
|
|
|
|
|
|
parent_ids = mitogen.parent_ids
|
|
|
|
|
|
|
|
if local_id is None:
|
|
|
|
|
|
|
|
local_id = mitogen.context_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.is_privileged = (
|
|
|
|
|
|
|
|
(remote_id in parent_ids) or
|
|
|
|
|
|
|
|
auth_id in ([local_id] + parent_ids)
|
|
|
|
|
|
|
|
)
|
|
|
|
self.sent_modules = set(['mitogen', 'mitogen.core'])
|
|
|
|
self.sent_modules = set(['mitogen', 'mitogen.core'])
|
|
|
|
self._input_buf = collections.deque()
|
|
|
|
self._input_buf = collections.deque()
|
|
|
|
self._input_buf_len = 0
|
|
|
|
self._input_buf_len = 0
|
|
|
@ -2800,8 +2810,8 @@ class Router(object):
|
|
|
|
broker_exit_msg = 'Broker has exitted'
|
|
|
|
broker_exit_msg = 'Broker has exitted'
|
|
|
|
no_route_msg = 'no route to %r, my ID is %r'
|
|
|
|
no_route_msg = 'no route to %r, my ID is %r'
|
|
|
|
unidirectional_msg = (
|
|
|
|
unidirectional_msg = (
|
|
|
|
'routing mode prevents forward of message from context %d via '
|
|
|
|
'routing mode prevents forward of message from context %d to '
|
|
|
|
'context %d'
|
|
|
|
'context %d via context %d'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, broker):
|
|
|
|
def __init__(self, broker):
|
|
|
@ -3152,7 +3162,9 @@ class Router(object):
|
|
|
|
(in_stream.protocol.is_privileged or
|
|
|
|
(in_stream.protocol.is_privileged or
|
|
|
|
out_stream.protocol.is_privileged):
|
|
|
|
out_stream.protocol.is_privileged):
|
|
|
|
self._maybe_send_dead(msg, self.unidirectional_msg,
|
|
|
|
self._maybe_send_dead(msg, self.unidirectional_msg,
|
|
|
|
in_stream.protocol.remote_id, out_stream.protocol.remote_id)
|
|
|
|
in_stream.protocol.remote_id,
|
|
|
|
|
|
|
|
out_stream.protocol.remote_id,
|
|
|
|
|
|
|
|
mitogen.context_id)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
out_stream.protocol._send(msg)
|
|
|
|
out_stream.protocol._send(msg)
|
|
|
@ -3641,7 +3653,12 @@ class ExternalContext(object):
|
|
|
|
os.close(in_fd)
|
|
|
|
os.close(in_fd)
|
|
|
|
|
|
|
|
|
|
|
|
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
|
|
|
|
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
|
|
|
|
self.stream = MitogenProtocol.build_stream(self.router, parent_id)
|
|
|
|
self.stream = MitogenProtocol.build_stream(
|
|
|
|
|
|
|
|
self.router,
|
|
|
|
|
|
|
|
parent_id,
|
|
|
|
|
|
|
|
local_id=self.config['context_id'],
|
|
|
|
|
|
|
|
parent_ids=self.config['parent_ids']
|
|
|
|
|
|
|
|
)
|
|
|
|
self.stream.accept(in_fp, out_fp)
|
|
|
|
self.stream.accept(in_fp, out_fp)
|
|
|
|
self.stream.name = 'parent'
|
|
|
|
self.stream.name = 'parent'
|
|
|
|
self.stream.receive_side.keep_alive = False
|
|
|
|
self.stream.receive_side.keep_alive = False
|
|
|
|