Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  core: allow Router.shutdown() to succeed after exit.
  issue #446: update Receiver.__iter__ to match
  tests: fix responder_test after removing unused imports
  parent: remove unused imports
  issue #456: loosen Waker.defer() shutdown test a little
  tests: stray socket file left during unix_test.
  tests: quieten a bunch of spam printed during run
  tests: fix /etc/environment test on vanilla
  issue #459: one line stats output during shutdown
  tests: good_load_module_size check can't be exact
issue510
David Wilson 6 years ago
commit 1f368d3bc0

@ -894,8 +894,9 @@ class Receiver(object):
until :class:`ChannelError` is raised.
"""
while True:
msg = self.get(throw_dead=False)
if msg.is_dead:
try:
msg = self.get()
except ChannelError:
return
yield msg
@ -2122,8 +2123,8 @@ class Waker(BasicStream):
broker_shutdown_msg = (
"An attempt was made to enqueue a message with a Broker that has "
"already begun shutting down. It is likely your program called "
"Broker.shutdown() too early."
"already exitted. It is likely your program called Broker.shutdown() "
"too early."
)
def defer(self, func, *args, **kwargs):
@ -2138,7 +2139,7 @@ class Waker(BasicStream):
if threading.currentThread().ident == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self)
return func(*args, **kwargs)
if not self._broker._alive:
if self._broker._exitted:
raise Error(self.broker_shutdown_msg)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
@ -2564,6 +2565,7 @@ class Broker(object):
def __init__(self, poller_class=None):
self._alive = True
self._exitted = False
self._waker = Waker(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread.
@ -2724,6 +2726,7 @@ class Broker(object):
except Exception:
LOG.exception('_broker_main() crashed')
self._exitted = True
self._broker_exit()
fire(self, 'exit')
@ -2735,7 +2738,8 @@ class Broker(object):
_v and LOG.debug('%r.shutdown()', self)
def _shutdown():
self._alive = False
self.defer(_shutdown)
if self._alive and not self._exitted:
self.defer(_shutdown)
def join(self):
"""

@ -714,8 +714,8 @@ class ModuleResponder(object):
if tup[2] and is_stdlib_path(tup[2]):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.warning('%r: refusing to serve stdlib module %r',
self, fullname)
LOG.debug('%r: refusing to serve stdlib module %r',
self, fullname)
self._send_module_load_failed(stream, fullname)
return
@ -879,16 +879,13 @@ class Router(mitogen.parent.Router):
)
LOG.debug(
'%(self)r: stats:\n'
' GET_MODULE requests: %(get_module_count)d\n'
' GET_MODULE runtime: %(get_module_ms)d ms\n'
' LOAD_MODULE responses: %(good_load_module_count)d\n'
' Negative LOAD_MODULE responses: '
'%(bad_load_module_count)d\n'
' LOAD_MODULE total bytes sent: '
'%(good_load_module_size_kb).02f kb\n'
' LOAD_MODULE avg bytes sent: '
'%(good_load_module_size_avg).02f kb'
'%(self)r: stats: '
'%(get_module_count)d module requests in '
'%(get_module_ms)d ms, '
'%(good_load_module_count)d sent, '
'%(bad_load_module_count)d negative responses. '
'Sent %(good_load_module_size_kb).01f kb total, '
'%(good_load_module_size_avg).01f kb avg.'
% dct
)

@ -53,16 +53,6 @@ import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
try:
from functools import lru_cache
except ImportError:
from mitogen.compat.functools import lru_cache
import mitogen.core
from mitogen.core import b
from mitogen.core import LOG

@ -542,7 +542,7 @@ class Pool(object):
msg = self._select.get()
except (mitogen.core.ChannelError, mitogen.core.LatchError):
e = sys.exc_info()[1]
LOG.info('%r: channel or latch closed, exitting: %s', self, e)
LOG.debug('%r: channel or latch closed, exitting: %s', self, e)
return
func = self._func_by_recv[msg.receiver]

@ -17,6 +17,9 @@
MAGIC_ETC_ENV=555
become: true
- include_tasks: _reset_conn.yml
when: not is_mitogen
- shell: echo $MAGIC_ETC_ENV
register: echo
@ -28,6 +31,9 @@
state: absent
become: true
- include_tasks: _reset_conn.yml
when: not is_mitogen
- shell: echo $MAGIC_ETC_ENV
register: echo

@ -29,9 +29,9 @@ import os.path
try:
import six as _system_six
print('six_brokenpkg: using system six:', _system_six)
#print('six_brokenpkg: using system six:', _system_six)
except ImportError:
print('six_brokenpkg: no system six available')
#print('six_brokenpkg: no system six available')
_system_six = None
if _system_six:

@ -37,6 +37,27 @@ class IterationTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(list(range(5)), list(m.unpickle() for m in recv))
self.assertEquals(10, ret.get().unpickle())
def iter_and_put(self, recv, latch):
try:
for msg in recv:
latch.put(msg)
except Exception:
latch.put(sys.exc_info()[1])
def test_close_stops_iteration(self):
recv = mitogen.core.Receiver(self.router)
latch = mitogen.core.Latch()
t = threading.Thread(
target=self.iter_and_put,
args=(recv, latch),
)
t.start()
t.join(0.1)
recv.close()
t.join()
self.assertTrue(latch.empty())
class CloseTest(testlib.RouterMixin, testlib.TestCase):
def wait(self, latch, wait_recv):

@ -67,16 +67,16 @@ class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(bits[-3:], ['def', 'main():', 'pass'])
class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
def test_plain_old_module(self):
# The simplest case: a top-level module with no interesting imports or
# package machinery damage.
context = self.router.local()
self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
self.assertEquals(1, self.router.responder.get_module_count)
self.assertEquals(1, self.router.responder.good_load_module_count)
self.assertEquals(359, self.router.responder.good_load_module_size)
self.assertLess(300, self.router.responder.good_load_module_size)
def test_simple_pkg(self):
# Ensure success of a simple package containing two submodules, one of
@ -87,7 +87,7 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(3, self.router.responder.good_load_module_count)
self.assertEquals(0, self.router.responder.bad_load_module_count)
self.assertEquals(537, self.router.responder.good_load_module_size)
self.assertLess(450, self.router.responder.good_load_module_size)
def test_self_contained_program(self):
# Ensure a program composed of a single script can be imported
@ -170,8 +170,8 @@ class ForwardTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(3, self.router.responder.get_module_count)
self.assertEquals(5, self.router.responder.good_load_module_count)
self.assertEquals(28148, self.router.responder.good_load_module_size)
self.assertEquals(3, self.router.responder.good_load_module_count)
self.assertLess(23000, self.router.responder.good_load_module_size)
class BlacklistTest(testlib.TestCase):

@ -54,8 +54,13 @@ class ActivationTest(testlib.RouterMixin, testlib.TestCase):
l1 = self.router.fork()
l2 = self.router.fork()
l1.call_service(MyService, 'get_id') # force framework activation
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
capture = testlib.LogCapturer()
capture.start()
try:
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
finally:
capture.stop()
msg = mitogen.service.Activator.not_active_msg % (MyService2.name(),)
self.assertTrue(msg in exc.args[0])
@ -80,8 +85,13 @@ class PermissionTest(testlib.RouterMixin, testlib.TestCase):
l1 = self.router.fork()
l1.call_service(MyService, 'get_id')
l2 = self.router.fork()
exc = self.assertRaises(mitogen.core.CallError, lambda:
l2.call(call_service_in, l1, MyService.name(), 'privileged_op'))
capture = testlib.LogCapturer()
capture.start()
try:
exc = self.assertRaises(mitogen.core.CallError, lambda:
l2.call(call_service_in, l1, MyService.name(), 'privileged_op'))
finally:
capture.stop()
msg = mitogen.service.Invoker.unauthorized_msg % (
u'privileged_op',
MyService.name(),

@ -13,6 +13,7 @@ import testlib
def roundtrip(*args):
return args
class TwoThreeCompatTest(testlib.RouterMixin, testlib.TestCase):
if mitogen.core.PY3:
python_path = 'python2'

@ -62,8 +62,17 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):
def test_constructor_basic(self):
listener = self.klass(router=self.router)
self.assertFalse(mitogen.unix.is_path_dead(listener.path))
os.unlink(listener.path)
capture = testlib.LogCapturer()
capture.start()
try:
self.assertFalse(mitogen.unix.is_path_dead(listener.path))
os.unlink(listener.path)
# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True
finally:
capture.stop()
class ClientTest(testlib.TestCase):
@ -89,6 +98,7 @@ class ClientTest(testlib.TestCase):
self.assertEquals(0, resp['auth_id'])
router.broker.shutdown()
router.broker.join()
os.unlink(path)
def _test_simple_server(self, path):
router = mitogen.master.Router()

Loading…
Cancel
Save