From babe3eec319bb72cbb36cf0a5f8a2ce6aa320509 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 23 Oct 2018 23:07:49 +0100 Subject: [PATCH] issue #76: record egress context IDs Used in a subsequent change to broadcast DEL_ROUTE to potentially interested children. --- mitogen/core.py | 6 ++++++ tests/router_test.py | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index d6134694..9c4063fa 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1085,6 +1085,9 @@ class Stream(BasicStream): self._output_buf = collections.deque() self._input_buf_len = 0 self._output_buf_len = 0 + #: Routing records the dst_id of every message arriving from this + #: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID. + self.egress_ids = set() def construct(self): pass @@ -1838,6 +1841,9 @@ class Router(object): if in_stream.auth_id is not None: msg.auth_id = in_stream.auth_id + # Maintain a set of IDs the source ever communicated with. + in_stream.egress_ids.add(msg.dst_id) + if msg.dst_id == mitogen.context_id: return self._invoke(msg, in_stream) diff --git a/tests/router_test.py b/tests/router_test.py index 68474e00..d0e4f539 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -314,5 +314,16 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase): self.assertTrue('policy refused message: ' in logs.stop()) +class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): + def test_egress_ids_populated(self): + # Ensure Stream.egress_ids is populated on message reception. + c1 = self.router.fork() + stream = self.router.stream_by_id(c1.context_id) + self.assertEquals(set(), stream.egress_ids) + + c1.call(time.sleep, 0) + self.assertEquals(set([mitogen.context_id]), stream.egress_ids) + + if __name__ == '__main__': unittest2.main()