core: Receiver.close() now wakes all threads; closes #446.

issue510
David Wilson 6 years ago
parent f2f41809ae
commit 388649df97

@ -304,6 +304,12 @@ Core Library
have dead messages sent in reply to them, preventing peer contexts from have dead messages sent in reply to them, preventing peer contexts from
hanging due to a forgotten buffered message. hanging due to a forgotten buffered message.
* `#446 <https://github.com/dw/mitogen/issues/446>`_: given thread A calling
:meth:`mitogen.core.Receiver.close`, and thread B, C, and D sleeping in
:meth:`mitogen.core.Receiver.get`, previously only one sleeping thread would
be woken with :class:`mitogen.core.ChannelError` when the receiver was
closed. Now all threads are woken per the docstring.
* `#447 <https://github.com/dw/mitogen/issues/447>`_: duplicate attempts to * `#447 <https://github.com/dw/mitogen/issues/447>`_: duplicate attempts to
invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised, invoke :meth:`mitogen.core.Router.add_handler` cause an error to be raised,
ensuring accidental re-registration of service pools are reported correctly. ensuring accidental re-registration of service pools are reported correctly.

@ -848,7 +848,7 @@ class Receiver(object):
if self.handle: if self.handle:
self.router.del_handler(self.handle) self.router.del_handler(self.handle)
self.handle = None self.handle = None
self._latch.put(Message.dead(self.closed_msg)) self._latch.close()
def empty(self): def empty(self):
""" """
@ -879,7 +879,10 @@ class Receiver(object):
received, and `data` is its unpickled data part. received, and `data` is its unpickled data part.
""" """
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
try:
msg = self._latch.get(timeout=timeout, block=block) msg = self._latch.get(timeout=timeout, block=block)
except LatchError:
raise ChannelError(self.closed_msg)
if msg.is_dead and throw_dead: if msg.is_dead and throw_dead:
msg._throw_dead() msg._throw_dead()
return msg return msg

@ -1,4 +1,6 @@
import sys
import threading
import unittest2 import unittest2
import mitogen.core import mitogen.core
@ -36,5 +38,43 @@ class IterationTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(10, ret.get().unpickle()) self.assertEquals(10, ret.get().unpickle())
class CloseTest(testlib.RouterMixin, testlib.TestCase):
def wait(self, latch, wait_recv):
try:
latch.put(wait_recv.get())
except Exception:
latch.put(sys.exc_info()[1])
def test_closes_one(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
t.start()
wait_recv.close()
def throw():
raise latch.get()
t.join()
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg)
def test_closes_all(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
ts = [
threading.Thread(target=lambda: self.wait(latch, wait_recv))
for x in range(5)
]
for t in ts:
t.start()
wait_recv.close()
def throw():
raise latch.get()
for x in range(5):
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEquals(e.args[0], mitogen.core.Receiver.closed_msg)
for t in ts:
t.join()
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

Loading…
Cancel
Save