Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  unix: add Listener.__repr__.
  issue #446: basic tests for Receiver._on_receive().
  core: Latch.empty() improvements
  core: Receiver.close() now wakes all threads; closes #446.
issue510
David Wilson 6 years ago
commit 87171b0763

@ -304,6 +304,12 @@ Core Library
have dead messages sent in reply to them, preventing peer contexts from
hanging due to a forgotten buffered message.
* `#446 <https://github.com/dw/mitogen/issues/446>`_: given thread A calling
:meth:`mitogen.core.Receiver.close`, and thread B, C, and D sleeping in
:meth:`mitogen.core.Receiver.get`, previously only one sleeping thread would
be woken with :class:`mitogen.core.ChannelError` when the receiver was
closed. Now all threads are woken per the docstring.
* `#447 <https://github.com/dw/mitogen/issues/447>`_: duplicate attempts to
invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised,
ensuring accidental re-registration of service pools are reported correctly.

@ -742,7 +742,8 @@ class Sender(object):
self.context.send(
Message.dead(
reason=self.explicit_close_msg,
handle=self.dst_handle)
handle=self.dst_handle
)
)
def __repr__(self):
@ -848,7 +849,7 @@ class Receiver(object):
if self.handle:
self.router.del_handler(self.handle)
self.handle = None
self._latch.put(Message.dead(self.closed_msg))
self._latch.close()
def empty(self):
"""
@ -879,7 +880,10 @@ class Receiver(object):
received, and `data` is its unpickled data part.
"""
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block)
try:
msg = self._latch.get(timeout=timeout, block=block)
except LatchError:
raise ChannelError(self.closed_msg)
if msg.is_dead and throw_dead:
msg._throw_dead()
return msg
@ -1881,8 +1885,17 @@ class Latch(object):
though a subsequent call to :meth:`get` will block, since another
waiting thread may be woken at any moment between :meth:`empty` and
:meth:`get`.
:raises LatchError:
The latch has already been marked closed.
"""
return len(self._queue) == 0
self._lock.acquire()
try:
if self.closed:
raise LatchError()
return len(self._queue) == 0
finally:
self._lock.release()
def _get_socketpair(self):
"""

@ -66,6 +66,13 @@ def make_socket_path():
class Listener(mitogen.core.BasicStream):
keep_alive = True
def __repr__(self):
return '%s.%s(%r)' % (
__name__,
self.__class__.__name__,
self.path,
)
def __init__(self, router, path=None, backlog=100):
self._router = router
self.path = path or make_socket_path()

@ -21,6 +21,13 @@ class EmptyTest(testlib.TestCase):
latch.put(None)
self.assertTrue(not latch.empty())
def test_closed_is_empty(self):
latch = self.klass()
latch.put(None)
latch.close()
self.assertRaises(mitogen.core.LatchError,
lambda: latch.empty())
class GetTest(testlib.TestCase):
klass = mitogen.core.Latch

@ -1,4 +1,6 @@
import sys
import threading
import unittest2
import mitogen.core
@ -36,5 +38,92 @@ class IterationTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(10, ret.get().unpickle())
class CloseTest(testlib.RouterMixin, testlib.TestCase):
def wait(self, latch, wait_recv):
try:
latch.put(wait_recv.get())
except Exception:
latch.put(sys.exc_info()[1])
def test_closes_one(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
t.start()
wait_recv.close()
def throw():
raise latch.get()
t.join()
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg)
def test_closes_all(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
ts = [
threading.Thread(target=lambda: self.wait(latch, wait_recv))
for x in range(5)
]
for t in ts:
t.start()
wait_recv.close()
def throw():
raise latch.get()
for x in range(5):
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg)
for t in ts:
t.join()
class OnReceiveTest(testlib.RouterMixin, testlib.TestCase):
# Verify behaviour of _on_receive dead message handling. A dead message
# should unregister the receiver and wake all threads.
def wait(self, latch, wait_recv):
try:
latch.put(wait_recv.get())
except Exception:
latch.put(sys.exc_info()[1])
def test_sender_closes_one_thread(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
t.start()
sender = wait_recv.to_sender()
sender.close()
def throw():
raise latch.get()
t.join()
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], sender.explicit_close_msg)
@unittest2.skip(reason=(
'Unclear if a asingle dead message received from remote should '
'cause all threads to wake up.'
))
def test_sender_closes_all_threads(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
ts = [
threading.Thread(target=lambda: self.wait(latch, wait_recv))
for x in range(5)
]
for t in ts:
t.start()
sender = wait_recv.to_sender()
sender.close()
def throw():
raise latch.get()
for x in range(5):
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg)
for t in ts:
t.join()
# TODO: what happens to a Select subscribed to the receiver in this case?
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save