issue #76: record egress context IDs

Used in a subsequent change to broadcast DEL_ROUTE to potentially
interested children.
issue260
David Wilson 6 years ago
parent d7d40f1123
commit babe3eec31

@ -1085,6 +1085,9 @@ class Stream(BasicStream):
self._output_buf = collections.deque()
self._input_buf_len = 0
self._output_buf_len = 0
#: Routing records the dst_id of every message arriving from this
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
self.egress_ids = set()
def construct(self):
pass
@ -1838,6 +1841,9 @@ class Router(object):
if in_stream.auth_id is not None:
msg.auth_id = in_stream.auth_id
# Maintain a set of IDs the source ever communicated with.
in_stream.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream)

@ -314,5 +314,16 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue('policy refused message: ' in logs.stop())
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
def test_egress_ids_populated(self):
# Ensure Stream.egress_ids is populated on message reception.
c1 = self.router.fork()
stream = self.router.stream_by_id(c1.context_id)
self.assertEquals(set(), stream.egress_ids)
c1.call(time.sleep, 0)
self.assertEquals(set([mitogen.context_id]), stream.egress_ids)
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save