diff --git a/docs/changelog.rst b/docs/changelog.rst index f50833f5..1dd0a693 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -304,6 +304,10 @@ Core Library have dead messages sent in reply to them, preventing peer contexts from hanging due to a forgotten buffered message. +* `#447 `_: duplicate attempts to + invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, + ensuring accidental re-registration of service pools are reported correctly. + * `#453 `_: the loggers used in children for standard IO redirection have propagation disabled, preventing accidental reconfiguration of the :mod:`logging` package in a child from diff --git a/mitogen/core.py b/mitogen/core.py index bfc322df..446bf1b8 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2321,7 +2321,8 @@ class Router(object): self._handles_by_respondent[respondent].discard(handle) def add_handler(self, fn, handle=None, persist=True, - policy=None, respondent=None): + policy=None, respondent=None, + overwrite=False): """ Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to `handle` from this context. Unregister after one invocation if @@ -2367,12 +2368,19 @@ class Router(object): nonzero, a :class:`mitogen.core.CallError` is delivered to the sender indicating refusal occurred. + :param bool overwrite: + If :data:`True`, allow existing handles to be silently overwritten. + :return: `handle`, or if `handle` was :data:`None`, the newly allocated handle. + :raises Error: + Attemp to register handle that was already registered. """ handle = handle or next(self._last_handle) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + if handle in self._handle_map and not overwrite: + raise Error(self.duplicate_handle_msg) self._handle_map[handle] = persist, fn, policy, respondent if respondent: @@ -2384,6 +2392,7 @@ class Router(object): return handle + duplicate_handle_msg = 'cannot register a handle that is already exists' refused_msg = 'refused by policy' invalid_handle_msg = 'invalid handle' too_large_msg = 'message too large (max %d bytes)' diff --git a/mitogen/parent.py b/mitogen/parent.py index 726ab1c5..eb17a1b7 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1646,12 +1646,14 @@ class RouteMonitor(object): handle=mitogen.core.ADD_ROUTE, persist=True, policy=is_immediate_child, + overwrite=True, ) self.router.add_handler( fn=self._on_del_route, handle=mitogen.core.DEL_ROUTE, persist=True, policy=is_immediate_child, + overwrite=True, ) #: Mapping of Stream instance to integer context IDs reachable via the #: stream; used to cleanup routes during disconnection. diff --git a/tests/router_test.py b/tests/router_test.py index ebbf20ff..839692df 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -180,12 +180,36 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase): class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router - def test_invoked_at_shutdown(self): + def test_dead_message_sent_at_shutdown(self): router = self.klass() queue = Queue.Queue() handle = router.add_handler(queue.put) router.broker.shutdown() self.assertTrue(queue.get(timeout=5).is_dead) + router.broker.join() + + def test_cannot_double_register(self): + router = self.klass() + try: + router.add_handler((lambda: None), handle=1234) + e = self.assertRaises(mitogen.core.Error, + lambda: router.add_handler((lambda: None), handle=1234)) + self.assertEquals(router.duplicate_handle_msg, e.args[0]) + router.del_handler(1234) + finally: + router.broker.shutdown() + router.broker.join() + + def test_can_reregister(self): + router = self.klass() + try: + router.add_handler((lambda: None), handle=1234) + router.del_handler(1234) + router.add_handler((lambda: None), handle=1234) + router.del_handler(1234) + finally: + router.broker.shutdown() + router.broker.join() class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):