From 9916adc0a3791adb4c1d17d3c2295502898005db Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 07:37:28 +0000 Subject: [PATCH 1/6] issue #326: update Changelog. --- docs/changelog.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 6e208210..28bf2420 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -205,6 +205,11 @@ Fixes ``meta: reset_connection``, resources could become undesirably shared in subsequent children. +* `#426 `_: an oversight while + porting to Python 3 meant no automated 2->3 tests were running. A significant + number of 2->3 bugs were fixed, mostly in the form of Unicode/bytes + mismatches. + * `#362 `_, `#435 `_: the previous fix for slow Python 2.x subprocess creation on Red Hat caused newly spawned children to From dc92e529bcda8ae89aa51244e3cd0ebcc46e10f2 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 08:15:08 +0000 Subject: [PATCH 2/6] service: unregister receiver at shutdown; closes #445. --- docs/changelog.rst | 6 ++++++ mitogen/service.py | 1 + tests/service_test.py | 13 +++++++++++++ 3 files changed, 20 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 28bf2420..f50833f5 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -298,6 +298,12 @@ Core Library * `#444 `_: messages regarding unforwardable extension module are no longer logged as errors. +* `#445 `_: service pools unregister + the :data:`mitogen.core.CALL_SERVICE` handle at shutdown, ensuring any + outstanding messages are either processed by the pool as it shuts down, or + have dead messages sent in reply to them, preventing peer contexts from + hanging due to a forgotten buffered message. + * `#453 `_: the loggers used in children for standard IO redirection have propagation disabled, preventing accidental reconfiguration of the :mod:`logging` package in a child from diff --git a/mitogen/service.py b/mitogen/service.py index 57f761b1..dcd8165a 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -479,6 +479,7 @@ class Pool(object): def stop(self, join=True): self.closed = True + self._receiver.close() self._select.close() if join: self.join() diff --git a/tests/service_test.py b/tests/service_test.py index 8e2cdac3..0ff16032 100644 --- a/tests/service_test.py +++ b/tests/service_test.py @@ -89,5 +89,18 @@ class PermissionTest(testlib.RouterMixin, testlib.TestCase): self.assertTrue(msg in exc.args[0]) +class CloseTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.service.Pool + + def test_receiver_closed(self): + pool = self.klass(router=self.router, services=[]) + pool.stop() + self.assertEquals(None, pool._receiver.handle) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: self.router.myself().call_service(MyService, 'foobar')) + self.assertEquals(e.args[0], self.router.invalid_handle_msg) + + if __name__ == '__main__': unittest2.main() From de719fa249cb0532516f78b95d4c7cee334bcc3a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 08:24:24 +0000 Subject: [PATCH 3/6] core: throw error on duplicate add_handler(); closes #447. --- docs/changelog.rst | 4 ++++ mitogen/core.py | 11 ++++++++++- mitogen/parent.py | 2 ++ tests/router_test.py | 26 +++++++++++++++++++++++++- 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index f50833f5..1dd0a693 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -304,6 +304,10 @@ Core Library have dead messages sent in reply to them, preventing peer contexts from hanging due to a forgotten buffered message. +* `#447 `_: duplicate attempts to + invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, + ensuring accidental re-registration of service pools are reported correctly. + * `#453 `_: the loggers used in children for standard IO redirection have propagation disabled, preventing accidental reconfiguration of the :mod:`logging` package in a child from diff --git a/mitogen/core.py b/mitogen/core.py index bfc322df..446bf1b8 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2321,7 +2321,8 @@ class Router(object): self._handles_by_respondent[respondent].discard(handle) def add_handler(self, fn, handle=None, persist=True, - policy=None, respondent=None): + policy=None, respondent=None, + overwrite=False): """ Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to `handle` from this context. Unregister after one invocation if @@ -2367,12 +2368,19 @@ class Router(object): nonzero, a :class:`mitogen.core.CallError` is delivered to the sender indicating refusal occurred. + :param bool overwrite: + If :data:`True`, allow existing handles to be silently overwritten. + :return: `handle`, or if `handle` was :data:`None`, the newly allocated handle. + :raises Error: + Attemp to register handle that was already registered. """ handle = handle or next(self._last_handle) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + if handle in self._handle_map and not overwrite: + raise Error(self.duplicate_handle_msg) self._handle_map[handle] = persist, fn, policy, respondent if respondent: @@ -2384,6 +2392,7 @@ class Router(object): return handle + duplicate_handle_msg = 'cannot register a handle that is already exists' refused_msg = 'refused by policy' invalid_handle_msg = 'invalid handle' too_large_msg = 'message too large (max %d bytes)' diff --git a/mitogen/parent.py b/mitogen/parent.py index 726ab1c5..eb17a1b7 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1646,12 +1646,14 @@ class RouteMonitor(object): handle=mitogen.core.ADD_ROUTE, persist=True, policy=is_immediate_child, + overwrite=True, ) self.router.add_handler( fn=self._on_del_route, handle=mitogen.core.DEL_ROUTE, persist=True, policy=is_immediate_child, + overwrite=True, ) #: Mapping of Stream instance to integer context IDs reachable via the #: stream; used to cleanup routes during disconnection. diff --git a/tests/router_test.py b/tests/router_test.py index ebbf20ff..839692df 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -180,12 +180,36 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase): class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router - def test_invoked_at_shutdown(self): + def test_dead_message_sent_at_shutdown(self): router = self.klass() queue = Queue.Queue() handle = router.add_handler(queue.put) router.broker.shutdown() self.assertTrue(queue.get(timeout=5).is_dead) + router.broker.join() + + def test_cannot_double_register(self): + router = self.klass() + try: + router.add_handler((lambda: None), handle=1234) + e = self.assertRaises(mitogen.core.Error, + lambda: router.add_handler((lambda: None), handle=1234)) + self.assertEquals(router.duplicate_handle_msg, e.args[0]) + router.del_handler(1234) + finally: + router.broker.shutdown() + router.broker.join() + + def test_can_reregister(self): + router = self.klass() + try: + router.add_handler((lambda: None), handle=1234) + router.del_handler(1234) + router.add_handler((lambda: None), handle=1234) + router.del_handler(1234) + finally: + router.broker.shutdown() + router.broker.join() class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): From ab8d6afbae0adf50625470cec9367d73c110886d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 08:31:07 +0000 Subject: [PATCH 4/6] core: use ModuleNotFoundError in imporer if it is available; closes #448. --- docs/changelog.rst | 6 ++++++ mitogen/core.py | 17 +++++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 1dd0a693..7538912b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -308,6 +308,12 @@ Core Library invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, ensuring accidental re-registration of service pools are reported correctly. +* `#448 `_: the import hook + implementation now raises :class:`ModuleNotFoundError` instead of + :class:`ImportError` in Python 3.6 and above, to cope with an uncoming + version of the :class:`subprocess` module requiring this new subclass in the + middle of a minor Python release series. + * `#453 `_: the loggers used in children for standard IO redirection have propagation disabled, preventing accidental reconfiguration of the :mod:`logging` package in a child from diff --git a/mitogen/core.py b/mitogen/core.py index 446bf1b8..0a087a6d 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -67,6 +67,11 @@ try: except ImportError: from io import BytesIO +try: + ModuleNotFoundError +except NameError: + ModuleNotFoundError = ImportError + # TODO: usage of 'import' after setting __name__, but before fixing up # sys.modules generates a warning. This happens when profiling = True. warnings.filterwarnings('ignore', @@ -992,7 +997,7 @@ class Importer(object): # built-in module. That means it exists on a special linked list deep # within the bowels of the interpreter. We must special case it. if fullname == '__main__': - raise ImportError() + raise ModuleNotFoundError() parent, _, modname = fullname.rpartition('.') if parent: @@ -1057,7 +1062,7 @@ class Importer(object): def _refuse_imports(self, fullname): if is_blacklisted_import(self, fullname): - raise ImportError(self.blacklisted_msg % (fullname,)) + raise ModuleNotFoundError(self.blacklisted_msg % (fullname,)) f = sys._getframe(2) requestee = f.f_globals['__name__'] @@ -1069,7 +1074,7 @@ class Importer(object): # breaks any app that is not expecting its __main__ to suddenly be # sucked over a network and injected into a remote process, like # py.test. - raise ImportError(self.pkg_resources_msg) + raise ModuleNotFoundError(self.pkg_resources_msg) if fullname == 'pbr': # It claims to use pkg_resources to read version information, which @@ -1129,7 +1134,7 @@ class Importer(object): ret = self._cache[fullname] if ret[2] is None: - raise ImportError(self.absent_msg % (fullname,)) + raise ModuleNotFoundError(self.absent_msg % (fullname,)) pkg_present = ret[1] mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) @@ -1162,14 +1167,14 @@ class Importer(object): # reveals the module can't be loaded, and so load_module() # throws ImportError, on Python 3.x it is still possible for # the loader to be called to fetch metadata. - raise ImportError(self.absent_msg % (fullname,)) + raise ModuleNotFoundError(self.absent_msg % (fullname,)) return u'master:' + self._cache[fullname][2] def get_source(self, fullname): if fullname in self._cache: compressed = self._cache[fullname][3] if compressed is None: - raise ImportError(self.absent_msg % (fullname,)) + raise ModuleNotFoundError(self.absent_msg % (fullname,)) source = zlib.decompress(self._cache[fullname][3]) if PY3: From 1d97493fcdd86846073e45bf49c85b09ffef599e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 11:32:35 +0000 Subject: [PATCH 5/6] tests: fallout from #447. --- mitogen/core.py | 3 ++- mitogen/service.py | 6 ++++-- tests/responder_test.py | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 0a087a6d..dad472d7 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -791,7 +791,7 @@ class Receiver(object): raise_channelerror = True def __init__(self, router, handle=None, persist=True, - respondent=None, policy=None): + respondent=None, policy=None, overwrite=False): self.router = router #: The handle. self.handle = handle # Avoid __repr__ crash in add_handler() @@ -802,6 +802,7 @@ class Receiver(object): policy=policy, persist=persist, respondent=respondent, + overwrite=overwrite, ) def __repr__(self): diff --git a/mitogen/service.py b/mitogen/service.py index dcd8165a..5f6f4048 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -68,7 +68,8 @@ def get_or_create_pool(size=None, router=None): _pool_lock.acquire() try: if _pool_pid != os.getpid(): - _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) + _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE, + overwrite=True) _pool_pid = os.getpid() return _pool finally: @@ -432,12 +433,13 @@ class Pool(object): """ activator_class = Activator - def __init__(self, router, services, size=1): + def __init__(self, router, services, size=1, overwrite=False): self.router = router self._activator = self.activator_class() self._receiver = mitogen.core.Receiver( router=router, handle=mitogen.core.CALL_SERVICE, + overwrite=overwrite, ) self._select = mitogen.select.Select(oneshot=False) diff --git a/tests/responder_test.py b/tests/responder_test.py index 888302c0..5a7a636e 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -17,7 +17,8 @@ class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.master.ModuleResponder def call(self, *args, **kwargs): - return self.klass(self.router).neutralize_main(*args, **kwargs) + router = mock.Mock() + return self.klass(router).neutralize_main(*args, **kwargs) def test_missing_exec_guard(self): path = testlib.data_path('main_with_no_exec_guard.py') From f2f41809aee3e9aecc5924abc2a2415a9bfb57a5 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 19 Jan 2019 15:55:52 +0000 Subject: [PATCH 6/6] issue #459: initial get_stats() implementation --- docs/changelog.rst | 4 ++ mitogen/master.py | 94 +++++++++++++++++++++++++++++++++++++---- tests/responder_test.py | 29 +++++++++++++ 3 files changed, 119 insertions(+), 8 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 7538912b..715d6383 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -324,6 +324,10 @@ Core Library shut down, preventing new messages being enqueued that will never be sent, and subsequently producing a program hang. +* `#459 `_: the beginnings of a + :meth:`mitogen.master.Router.get_stats` call has been added. The initial + statistics cover the module loader only. + * `#462 `_: Mitogen could fail to open a PTY on broken Linux systems due to a bad interaction between the glibc :func:`grantpt` function and an incorrectly mounted ``/dev/pts`` filesystem. diff --git a/mitogen/master.py b/mitogen/master.py index 1fa1ef83..25e36c8d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -43,6 +43,7 @@ import pkgutil import re import string import sys +import time import threading import types import zlib @@ -583,6 +584,18 @@ class ModuleResponder(object): self._cache = {} # fullname -> pickled self.blacklist = [] self.whitelist = [''] + + #: Number of GET_MODULE messages received. + self.get_module_count = 0 + #: Total time spent in uncached GET_MODULE. + self.get_module_secs = 0.0 + #: Number of successful LOAD_MODULE messages sent. + self.good_load_module_count = 0 + #: Total bytes in successful LOAD_MODULE payloads. + self.good_load_module_size = 0 + #: Number of negative LOAD_MODULE messages sent. + self.bad_load_module_count = 0 + router.add_handler( fn=self._on_get_module, handle=mitogen.core.GET_MODULE, @@ -671,19 +684,25 @@ class ModuleResponder(object): def _send_load_module(self, stream, fullname): if fullname not in stream.sent_modules: LOG.debug('_send_load_module(%r, %r)', stream, fullname) - self._router._async_route( - mitogen.core.Message.pickled( - self._build_tuple(fullname), - dst_id=stream.remote_id, - handle=mitogen.core.LOAD_MODULE, - ) + tup = self._build_tuple(fullname) + msg = mitogen.core.Message.pickled( + tup, + dst_id=stream.remote_id, + handle=mitogen.core.LOAD_MODULE, ) + self._router._async_route(msg) stream.sent_modules.add(fullname) + if tup[2] is not None: + self.good_load_module_count += 1 + self.good_load_module_size += len(msg.data) + else: + self.bad_load_module_count += 1 def _send_module_load_failed(self, stream, fullname): + self.bad_load_module_count += 1 stream.send( mitogen.core.Message.pickled( - (fullname, None, None, None, ()), + self._make_negative_response(fullname), dst_id=stream.remote_id, handle=mitogen.core.LOAD_MODULE, ) @@ -717,13 +736,18 @@ class ModuleResponder(object): return LOG.debug('%r._on_get_module(%r)', self, msg.data) + self.get_module_count += 1 stream = self._router.stream_by_id(msg.src_id) fullname = msg.data.decode() if fullname in stream.sent_modules: LOG.warning('_on_get_module(): dup request for %r from %r', fullname, stream) - self._send_module_and_related(stream, fullname) + t0 = time.time() + try: + self._send_module_and_related(stream, fullname) + finally: + self.get_module_secs += time.time() - t0 def _send_forward_module(self, stream, context, fullname): if stream.remote_id != context.context_id: @@ -841,6 +865,60 @@ class Router(mitogen.parent.Router): persist=True, ) + def _on_broker_exit(self): + super(Router, self)._on_broker_exit() + dct = self.get_stats() + dct['self'] = self + dct['get_module_ms'] = 1000 * dct['get_module_secs'] + dct['good_load_module_size_kb'] = dct['good_load_module_size'] / 1024.0 + dct['good_load_module_size_avg'] = ( + ( + dct['good_load_module_size'] / + (float(dct['good_load_module_count']) or 1.0) + ) / 1024.0 + ) + + LOG.debug( + '%(self)r: stats:\n' + ' GET_MODULE requests: %(get_module_count)d\n' + ' GET_MODULE runtime: %(get_module_ms)d ms\n' + ' LOAD_MODULE responses: %(good_load_module_count)d\n' + ' Negative LOAD_MODULE responses: ' + '%(bad_load_module_count)d\n' + ' LOAD_MODULE total bytes sent: ' + '%(good_load_module_size_kb).02f kb\n' + ' LOAD_MODULE avg bytes sent: ' + '%(good_load_module_size_avg).02f kb' + % dct + ) + + def get_stats(self): + """ + Return performance data for the module responder. + + :returns: + + Dict containing keys: + + * `get_module_count`: Integer count of + :data:`mitogen.core.GET_MODULE` messages received. + * `get_module_secs`: Floating point total seconds spent servicing + :data:`mitogen.core.GET_MODULE` requests. + * `good_load_module_count`: Integer count of successful + :data:`mitogen.core.LOAD_MODULE` messages sent. + * `good_load_module_size`: Integer total bytes sent in + :data:`mitogen.core.LOAD_MODULE` message payloads. + * `bad_load_module_count`: Integer count of negative + :data:`mitogen.core.LOAD_MODULE` messages sent. + """ + return { + 'get_module_count': self.responder.get_module_count, + 'get_module_secs': self.responder.get_module_secs, + 'good_load_module_count': self.responder.good_load_module_count, + 'good_load_module_size': self.responder.good_load_module_size, + 'bad_load_module_count': self.responder.bad_load_module_count, + } + def enable_debug(self): """ Cause this context and any descendant child contexts to write debug diff --git a/tests/responder_test.py b/tests/responder_test.py index 5a7a636e..dd790880 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -74,6 +74,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase): # package machinery damage. context = self.router.local() self.assertEquals(256, context.call(plain_old_module.pow, 2, 8)) + self.assertEquals(1, self.router.responder.get_module_count) + self.assertEquals(1, self.router.responder.good_load_module_count) + self.assertEquals(359, self.router.responder.good_load_module_size) def test_simple_pkg(self): # Ensure success of a simple package containing two submodules, one of @@ -81,6 +84,10 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase): context = self.router.local() self.assertEquals(3, context.call(simple_pkg.a.subtract_one_add_two, 2)) + self.assertEquals(2, self.router.responder.get_module_count) + self.assertEquals(3, self.router.responder.good_load_module_count) + self.assertEquals(0, self.router.responder.bad_load_module_count) + self.assertEquals(537, self.router.responder.good_load_module_size) def test_self_contained_program(self): # Ensure a program composed of a single script can be imported @@ -110,6 +117,11 @@ class BrokenModulesTest(testlib.TestCase): responder._on_get_module(msg) self.assertEquals(1, len(router._async_route.mock_calls)) + self.assertEquals(1, responder.get_module_count) + self.assertEquals(0, responder.good_load_module_count) + self.assertEquals(0, responder.good_load_module_size) + self.assertEquals(1, responder.bad_load_module_count) + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) @@ -139,12 +151,29 @@ class BrokenModulesTest(testlib.TestCase): responder._on_get_module(msg) self.assertEquals(1, len(router._async_route.mock_calls)) + self.assertEquals(1, responder.get_module_count) + self.assertEquals(0, responder.good_load_module_count) + self.assertEquals(0, responder.good_load_module_size) + self.assertEquals(1, responder.bad_load_module_count) + call = router._async_route.mock_calls[0] msg, = call[1] self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle) self.assertIsInstance(msg.unpickle(), tuple) +class ForwardTest(testlib.RouterMixin, testlib.TestCase): + def test_stats(self): + # Forwarding stats broken because forwarding is broken. See #469. + c1 = self.router.local() + c2 = self.router.fork(via=c1) + + self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8)) + self.assertEquals(3, self.router.responder.get_module_count) + self.assertEquals(5, self.router.responder.good_load_module_count) + self.assertEquals(28148, self.router.responder.good_load_module_size) + + class BlacklistTest(testlib.TestCase): @unittest2.skip('implement me') def test_whitelist_no_blacklist(self):