Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #459: initial get_stats() implementation
  tests: fallout from #447.
  core: use ModuleNotFoundError in imporer if it is available; closes #448.
  core: throw error on duplicate add_handler(); closes #447.
  service: unregister receiver at shutdown; closes #445.
  issue #326: update Changelog.
issue510
David Wilson 7 years ago
commit 40e9706339

@ -205,6 +205,11 @@ Fixes
``meta: reset_connection``, resources could become undesirably shared in
subsequent children.
* `#426 <https://github.com/dw/mitogen/issues/426>`_: an oversight while
porting to Python 3 meant no automated 2->3 tests were running. A significant
number of 2->3 bugs were fixed, mostly in the form of Unicode/bytes
mismatches.
* `#362 <https://github.com/dw/mitogen/issues/362>`_,
`#435 <https://github.com/dw/mitogen/issues/435>`_: the previous fix for slow
Python 2.x subprocess creation on Red Hat caused newly spawned children to
@ -293,6 +298,22 @@ Core Library
* `#444 <https://github.com/dw/mitogen/issues/444>`_: messages regarding
unforwardable extension module are no longer logged as errors.
* `#445 <https://github.com/dw/mitogen/issues/445>`_: service pools unregister
the :data:`mitogen.core.CALL_SERVICE` handle at shutdown, ensuring any
outstanding messages are either processed by the pool as it shuts down, or
have dead messages sent in reply to them, preventing peer contexts from
hanging due to a forgotten buffered message.
* `#447 <https://github.com/dw/mitogen/issues/447>`_: duplicate attempts to
invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised,
ensuring accidental re-registration of service pools are reported correctly.
* `#448 <https://github.com/dw/mitogen/issues/448>`_: the import hook
implementation now raises :class:`ModuleNotFoundError` instead of
:class:`ImportError` in Python 3.6 and above, to cope with an uncoming
version of the :class:`subprocess` module requiring this new subclass in the
middle of a minor Python release series.
* `#453 <https://github.com/dw/mitogen/issues/453>`_: the loggers used in
children for standard IO redirection have propagation disabled, preventing
accidental reconfiguration of the :mod:`logging` package in a child from
@ -303,6 +324,10 @@ Core Library
shut down, preventing new messages being enqueued that will never be sent,
and subsequently producing a program hang.
* `#459 <https://github.com/dw/mitogen/issues/459>`_: the beginnings of a
:meth:`mitogen.master.Router.get_stats` call has been added. The initial
statistics cover the module loader only.
* `#462 <https://github.com/dw/mitogen/issues/462>`_: Mitogen could fail to
open a PTY on broken Linux systems due to a bad interaction between the glibc
:func:`grantpt` function and an incorrectly mounted ``/dev/pts`` filesystem.

@ -67,6 +67,11 @@ try:
except ImportError:
from io import BytesIO
try:
ModuleNotFoundError
except NameError:
ModuleNotFoundError = ImportError
# TODO: usage of 'import' after setting __name__, but before fixing up
# sys.modules generates a warning. This happens when profiling = True.
warnings.filterwarnings('ignore',
@ -786,7 +791,7 @@ class Receiver(object):
raise_channelerror = True
def __init__(self, router, handle=None, persist=True,
respondent=None, policy=None):
respondent=None, policy=None, overwrite=False):
self.router = router
#: The handle.
self.handle = handle # Avoid __repr__ crash in add_handler()
@ -797,6 +802,7 @@ class Receiver(object):
policy=policy,
persist=persist,
respondent=respondent,
overwrite=overwrite,
)
def __repr__(self):
@ -992,7 +998,7 @@ class Importer(object):
# built-in module. That means it exists on a special linked list deep
# within the bowels of the interpreter. We must special case it.
if fullname == '__main__':
raise ImportError()
raise ModuleNotFoundError()
parent, _, modname = fullname.rpartition('.')
if parent:
@ -1057,7 +1063,7 @@ class Importer(object):
def _refuse_imports(self, fullname):
if is_blacklisted_import(self, fullname):
raise ImportError(self.blacklisted_msg % (fullname,))
raise ModuleNotFoundError(self.blacklisted_msg % (fullname,))
f = sys._getframe(2)
requestee = f.f_globals['__name__']
@ -1069,7 +1075,7 @@ class Importer(object):
# breaks any app that is not expecting its __main__ to suddenly be
# sucked over a network and injected into a remote process, like
# py.test.
raise ImportError(self.pkg_resources_msg)
raise ModuleNotFoundError(self.pkg_resources_msg)
if fullname == 'pbr':
# It claims to use pkg_resources to read version information, which
@ -1129,7 +1135,7 @@ class Importer(object):
ret = self._cache[fullname]
if ret[2] is None:
raise ImportError(self.absent_msg % (fullname,))
raise ModuleNotFoundError(self.absent_msg % (fullname,))
pkg_present = ret[1]
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
@ -1162,14 +1168,14 @@ class Importer(object):
# reveals the module can't be loaded, and so load_module()
# throws ImportError, on Python 3.x it is still possible for
# the loader to be called to fetch metadata.
raise ImportError(self.absent_msg % (fullname,))
raise ModuleNotFoundError(self.absent_msg % (fullname,))
return u'master:' + self._cache[fullname][2]
def get_source(self, fullname):
if fullname in self._cache:
compressed = self._cache[fullname][3]
if compressed is None:
raise ImportError(self.absent_msg % (fullname,))
raise ModuleNotFoundError(self.absent_msg % (fullname,))
source = zlib.decompress(self._cache[fullname][3])
if PY3:
@ -2321,7 +2327,8 @@ class Router(object):
self._handles_by_respondent[respondent].discard(handle)
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
policy=None, respondent=None,
overwrite=False):
"""
Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to
`handle` from this context. Unregister after one invocation if
@ -2367,12 +2374,19 @@ class Router(object):
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:param bool overwrite:
If :data:`True`, allow existing handles to be silently overwritten.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
:raises Error:
Attemp to register handle that was already registered.
"""
handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
if handle in self._handle_map and not overwrite:
raise Error(self.duplicate_handle_msg)
self._handle_map[handle] = persist, fn, policy, respondent
if respondent:
@ -2384,6 +2398,7 @@ class Router(object):
return handle
duplicate_handle_msg = 'cannot register a handle that is already exists'
refused_msg = 'refused by policy'
invalid_handle_msg = 'invalid handle'
too_large_msg = 'message too large (max %d bytes)'

@ -43,6 +43,7 @@ import pkgutil
import re
import string
import sys
import time
import threading
import types
import zlib
@ -583,6 +584,18 @@ class ModuleResponder(object):
self._cache = {} # fullname -> pickled
self.blacklist = []
self.whitelist = ['']
#: Number of GET_MODULE messages received.
self.get_module_count = 0
#: Total time spent in uncached GET_MODULE.
self.get_module_secs = 0.0
#: Number of successful LOAD_MODULE messages sent.
self.good_load_module_count = 0
#: Total bytes in successful LOAD_MODULE payloads.
self.good_load_module_size = 0
#: Number of negative LOAD_MODULE messages sent.
self.bad_load_module_count = 0
router.add_handler(
fn=self._on_get_module,
handle=mitogen.core.GET_MODULE,
@ -671,19 +684,25 @@ class ModuleResponder(object):
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
self._router._async_route(
mitogen.core.Message.pickled(
self._build_tuple(fullname),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
self._router._async_route(msg)
stream.sent_modules.add(fullname)
if tup[2] is not None:
self.good_load_module_count += 1
self.good_load_module_size += len(msg.data)
else:
self.bad_load_module_count += 1
def _send_module_load_failed(self, stream, fullname):
self.bad_load_module_count += 1
stream.send(
mitogen.core.Message.pickled(
(fullname, None, None, None, ()),
self._make_negative_response(fullname),
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
@ -717,13 +736,18 @@ class ModuleResponder(object):
return
LOG.debug('%r._on_get_module(%r)', self, msg.data)
self.get_module_count += 1
stream = self._router.stream_by_id(msg.src_id)
fullname = msg.data.decode()
if fullname in stream.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
self._send_module_and_related(stream, fullname)
t0 = time.time()
try:
self._send_module_and_related(stream, fullname)
finally:
self.get_module_secs += time.time() - t0
def _send_forward_module(self, stream, context, fullname):
if stream.remote_id != context.context_id:
@ -841,6 +865,60 @@ class Router(mitogen.parent.Router):
persist=True,
)
def _on_broker_exit(self):
super(Router, self)._on_broker_exit()
dct = self.get_stats()
dct['self'] = self
dct['get_module_ms'] = 1000 * dct['get_module_secs']
dct['good_load_module_size_kb'] = dct['good_load_module_size'] / 1024.0
dct['good_load_module_size_avg'] = (
(
dct['good_load_module_size'] /
(float(dct['good_load_module_count']) or 1.0)
) / 1024.0
)
LOG.debug(
'%(self)r: stats:\n'
' GET_MODULE requests: %(get_module_count)d\n'
' GET_MODULE runtime: %(get_module_ms)d ms\n'
' LOAD_MODULE responses: %(good_load_module_count)d\n'
' Negative LOAD_MODULE responses: '
'%(bad_load_module_count)d\n'
' LOAD_MODULE total bytes sent: '
'%(good_load_module_size_kb).02f kb\n'
' LOAD_MODULE avg bytes sent: '
'%(good_load_module_size_avg).02f kb'
% dct
)
def get_stats(self):
"""
Return performance data for the module responder.
:returns:
Dict containing keys:
* `get_module_count`: Integer count of
:data:`mitogen.core.GET_MODULE` messages received.
* `get_module_secs`: Floating point total seconds spent servicing
:data:`mitogen.core.GET_MODULE` requests.
* `good_load_module_count`: Integer count of successful
:data:`mitogen.core.LOAD_MODULE` messages sent.
* `good_load_module_size`: Integer total bytes sent in
:data:`mitogen.core.LOAD_MODULE` message payloads.
* `bad_load_module_count`: Integer count of negative
:data:`mitogen.core.LOAD_MODULE` messages sent.
"""
return {
'get_module_count': self.responder.get_module_count,
'get_module_secs': self.responder.get_module_secs,
'good_load_module_count': self.responder.good_load_module_count,
'good_load_module_size': self.responder.good_load_module_size,
'bad_load_module_count': self.responder.bad_load_module_count,
}
def enable_debug(self):
"""
Cause this context and any descendant child contexts to write debug

@ -1646,12 +1646,14 @@ class RouteMonitor(object):
handle=mitogen.core.ADD_ROUTE,
persist=True,
policy=is_immediate_child,
overwrite=True,
)
self.router.add_handler(
fn=self._on_del_route,
handle=mitogen.core.DEL_ROUTE,
persist=True,
policy=is_immediate_child,
overwrite=True,
)
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.

@ -68,7 +68,8 @@ def get_or_create_pool(size=None, router=None):
_pool_lock.acquire()
try:
if _pool_pid != os.getpid():
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE)
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE,
overwrite=True)
_pool_pid = os.getpid()
return _pool
finally:
@ -432,12 +433,13 @@ class Pool(object):
"""
activator_class = Activator
def __init__(self, router, services, size=1):
def __init__(self, router, services, size=1, overwrite=False):
self.router = router
self._activator = self.activator_class()
self._receiver = mitogen.core.Receiver(
router=router,
handle=mitogen.core.CALL_SERVICE,
overwrite=overwrite,
)
self._select = mitogen.select.Select(oneshot=False)
@ -479,6 +481,7 @@ class Pool(object):
def stop(self, join=True):
self.closed = True
self._receiver.close()
self._select.close()
if join:
self.join()

@ -17,7 +17,8 @@ class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.ModuleResponder
def call(self, *args, **kwargs):
return self.klass(self.router).neutralize_main(*args, **kwargs)
router = mock.Mock()
return self.klass(router).neutralize_main(*args, **kwargs)
def test_missing_exec_guard(self):
path = testlib.data_path('main_with_no_exec_guard.py')
@ -73,6 +74,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
# package machinery damage.
context = self.router.local()
self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
self.assertEquals(1, self.router.responder.get_module_count)
self.assertEquals(1, self.router.responder.good_load_module_count)
self.assertEquals(359, self.router.responder.good_load_module_size)
def test_simple_pkg(self):
# Ensure success of a simple package containing two submodules, one of
@ -80,6 +84,10 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
context = self.router.local()
self.assertEquals(3,
context.call(simple_pkg.a.subtract_one_add_two, 2))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(3, self.router.responder.good_load_module_count)
self.assertEquals(0, self.router.responder.bad_load_module_count)
self.assertEquals(537, self.router.responder.good_load_module_size)
def test_self_contained_program(self):
# Ensure a program composed of a single script can be imported
@ -109,6 +117,11 @@ class BrokenModulesTest(testlib.TestCase):
responder._on_get_module(msg)
self.assertEquals(1, len(router._async_route.mock_calls))
self.assertEquals(1, responder.get_module_count)
self.assertEquals(0, responder.good_load_module_count)
self.assertEquals(0, responder.good_load_module_size)
self.assertEquals(1, responder.bad_load_module_count)
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle)
@ -138,12 +151,29 @@ class BrokenModulesTest(testlib.TestCase):
responder._on_get_module(msg)
self.assertEquals(1, len(router._async_route.mock_calls))
self.assertEquals(1, responder.get_module_count)
self.assertEquals(0, responder.good_load_module_count)
self.assertEquals(0, responder.good_load_module_size)
self.assertEquals(1, responder.bad_load_module_count)
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEquals(mitogen.core.LOAD_MODULE, msg.handle)
self.assertIsInstance(msg.unpickle(), tuple)
class ForwardTest(testlib.RouterMixin, testlib.TestCase):
def test_stats(self):
# Forwarding stats broken because forwarding is broken. See #469.
c1 = self.router.local()
c2 = self.router.fork(via=c1)
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(3, self.router.responder.get_module_count)
self.assertEquals(5, self.router.responder.good_load_module_count)
self.assertEquals(28148, self.router.responder.good_load_module_size)
class BlacklistTest(testlib.TestCase):
@unittest2.skip('implement me')
def test_whitelist_no_blacklist(self):

@ -180,12 +180,36 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router
def test_invoked_at_shutdown(self):
def test_dead_message_sent_at_shutdown(self):
router = self.klass()
queue = Queue.Queue()
handle = router.add_handler(queue.put)
router.broker.shutdown()
self.assertTrue(queue.get(timeout=5).is_dead)
router.broker.join()
def test_cannot_double_register(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
e = self.assertRaises(mitogen.core.Error,
lambda: router.add_handler((lambda: None), handle=1234))
self.assertEquals(router.duplicate_handle_msg, e.args[0])
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
def test_can_reregister(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):

@ -89,5 +89,18 @@ class PermissionTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue(msg in exc.args[0])
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.service.Pool
def test_receiver_closed(self):
pool = self.klass(router=self.router, services=[])
pool.stop()
self.assertEquals(None, pool._receiver.handle)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: self.router.myself().call_service(MyService, 'foobar'))
self.assertEquals(e.args[0], self.router.invalid_handle_msg)
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save