core: throw error on duplicate add_handler(); closes #447.

issue510
David Wilson 6 years ago
parent dc92e529bc
commit de719fa249

@ -304,6 +304,10 @@ Core Library
have dead messages sent in reply to them, preventing peer contexts from
hanging due to a forgotten buffered message.
* `#447 <https://github.com/dw/mitogen/issues/447>`_: duplicate attempts to
invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised,
ensuring accidental re-registration of service pools are reported correctly.
* `#453 <https://github.com/dw/mitogen/issues/453>`_: the loggers used in
children for standard IO redirection have propagation disabled, preventing
accidental reconfiguration of the :mod:`logging` package in a child from

@ -2321,7 +2321,8 @@ class Router(object):
self._handles_by_respondent[respondent].discard(handle)
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
policy=None, respondent=None,
overwrite=False):
"""
Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to
`handle` from this context. Unregister after one invocation if
@ -2367,12 +2368,19 @@ class Router(object):
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:param bool overwrite:
If :data:`True`, allow existing handles to be silently overwritten.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
:raises Error:
Attemp to register handle that was already registered.
"""
handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
if handle in self._handle_map and not overwrite:
raise Error(self.duplicate_handle_msg)
self._handle_map[handle] = persist, fn, policy, respondent
if respondent:
@ -2384,6 +2392,7 @@ class Router(object):
return handle
duplicate_handle_msg = 'cannot register a handle that is already exists'
refused_msg = 'refused by policy'
invalid_handle_msg = 'invalid handle'
too_large_msg = 'message too large (max %d bytes)'

@ -1646,12 +1646,14 @@ class RouteMonitor(object):
handle=mitogen.core.ADD_ROUTE,
persist=True,
policy=is_immediate_child,
overwrite=True,
)
self.router.add_handler(
fn=self._on_del_route,
handle=mitogen.core.DEL_ROUTE,
persist=True,
policy=is_immediate_child,
overwrite=True,
)
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.

@ -180,12 +180,36 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router
def test_invoked_at_shutdown(self):
def test_dead_message_sent_at_shutdown(self):
router = self.klass()
queue = Queue.Queue()
handle = router.add_handler(queue.put)
router.broker.shutdown()
self.assertTrue(queue.get(timeout=5).is_dead)
router.broker.join()
def test_cannot_double_register(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
e = self.assertRaises(mitogen.core.Error,
lambda: router.add_handler((lambda: None), handle=1234))
self.assertEquals(router.duplicate_handle_msg, e.args[0])
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
def test_can_reregister(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):

Loading…
Cancel
Save