From f869e088f89cb19b85e559faf4e8e262cd777517 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 27 Sep 2017 13:47:14 +0530 Subject: [PATCH] issue #20: tests and fixes for mitogen.master.Select(). --- docs/api.rst | 22 +++- mitogen/master.py | 46 ++++++-- tests/select_test.py | 255 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 tests/select_test.py diff --git a/docs/api.rst b/docs/api.rst index 7bf54c75..7b0071d9 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -97,13 +97,17 @@ contexts. for context, workfunc in get_new_work(): select.add(context.call_async(workfunc)) - :py:class:`Select` may be nested: + :py:class:`Select` may be arbitrarily nested: .. code-block:: python subselects = [ mitogen.master.Select(get_some_work()), - mitogen.master.Select(get_some_work()) + mitogen.master.Select(get_some_work()), + mitogen.master.Select([ + mitogen.master.Select(get_some_work()), + mitogen.master.Select(get_some_work()) + ]) ] with mitogen.master.Select(selects, oneshot=False) as select: @@ -134,9 +138,17 @@ contexts. .. py:method:: empty () - Return ``True`` if no items appear to be queued on this receiver. Like - :py:class:`Queue.Queue`, this function's return value cannot be relied - upon. + Return ``True`` if no items appear to be queued on this receiver. + + As with :py:class:`Queue.Queue`, this function may return ``False`` + even though a subsequent call to :py:meth:`get` will succeed, since a + message may be posted at any moment between the call to + :py:meth:`empty` and :py:meth:`get`. + + :py:meth:`empty` may additionally return ``True`` when :py:meth:`get` + would block if another thread has drained a receiver added to this + select. This can be avoided by only consuming each receiver from a + single thread. .. py:method:: __iter__ (self) diff --git a/mitogen/master.py b/mitogen/master.py index ec43fd22..bbf0d687 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -1,3 +1,4 @@ +import Queue import commands import dis import errno @@ -231,18 +232,22 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'), co.co_consts[arg2] or ()) +class SelectError(mitogen.core.Error): + pass + + class Select(object): def __init__(self, receivers=(), oneshot=True): self._receivers = [] self._oneshot = oneshot self._queue = Queue.Queue() - self._notify = [] + self.notify = [] for recv in receivers: self.add(recv) def _put(self, value): self._queue.put(value) - for func in self._notify: + for func in self.notify: func(self) def __bool__(self): @@ -257,19 +262,38 @@ class Select(object): def __iter__(self): while self._receivers: recv, msg = self.get() - if self._oneshot: - self.remove(recv) yield recv, msg + loop_msg = 'Adding this Select instance would create a Select cycle' + + def _check_no_loop(self, recv): + if recv is self: + raise SelectError(self.loop_msg) + + for recv_ in self._receivers: + if recv_ == recv: + raise SelectError(self.loop_msg) + if isinstance(recv_, Select): + recv_._check_no_loop(recv) + def add(self, recv): + if isinstance(recv, Select): + recv._check_no_loop(self) + self._receivers.append(recv) recv.notify.append(self._put) # Avoid race by polling once after installation. if not recv.empty(): self._put(recv) + not_present_msg = 'Instance is not a member of this Select' + def remove(self, recv): - recv.notify.remove(self._put) + try: + self._receivers.remove(recv) + recv.notify.remove(self._put) + except (IndexError, ValueError): + raise SelectError(self.not_present_msg) def close(self): for recv in self._receivers[:]: @@ -278,11 +302,19 @@ class Select(object): def empty(self): return self._queue.empty() + empty_msg = 'Cannot got(), Select instance is empty' + def get(self, timeout=None): + if not self._receivers: + raise SelectError(self.empty_msg) + while True: - recv = mitogen.core._queue_interruptible_get(queue, timeout) + recv = mitogen.core._queue_interruptible_get(self._queue, timeout) try: - return recv, recv.get(block=False) + msg = recv.get(False) + if self._oneshot: + self.remove(recv) + return recv, msg except mitogen.core.TimeoutError: # A receiver may have been queued with no result if another # thread drained it before we woke up, or because another diff --git a/tests/select_test.py b/tests/select_test.py new file mode 100644 index 00000000..17705386 --- /dev/null +++ b/tests/select_test.py @@ -0,0 +1,255 @@ + +import unittest +import mitogen.master + +import testlib + + +class AddTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_receiver(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass() + select.add(recv) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(recv, select._receivers[0]) + self.assertEquals(1, len(recv.notify)) + self.assertEquals(select._put, recv.notify[0]) + + def test_channel(self): + context = self.router.local() + chan = mitogen.core.Channel(self.router, context, 1234) + select = self.klass() + select.add(chan) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(chan, select._receivers[0]) + self.assertEquals(1, len(chan.notify)) + self.assertEquals(select._put, chan.notify[0]) + + def test_subselect_empty(self): + select = self.klass() + subselect = self.klass() + select.add(subselect) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(subselect, select._receivers[0]) + self.assertEquals(1, len(subselect.notify)) + self.assertEquals(select._put, subselect.notify[0]) + + def test_subselect_nonempty(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass() + subselect = self.klass() + subselect.add(recv) + + select.add(subselect) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(subselect, select._receivers[0]) + self.assertEquals(1, len(subselect.notify)) + self.assertEquals(select._put, subselect.notify[0]) + + def test_subselect_loop_direct(self): + select = self.klass() + exc = self.assertRaises(mitogen.master.SelectError, + lambda: select.add(select)) + self.assertEquals(str(exc), self.klass.loop_msg) + + def test_subselect_loop_indirect(self): + s0 = self.klass() + s1 = self.klass() + s2 = self.klass() + + s0.add(s1) + s1.add(s2) + exc = self.assertRaises(mitogen.master.SelectError, + lambda: s2.add(s0)) + self.assertEquals(str(exc), self.klass.loop_msg) + + +class RemoveTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_empty(self): + select = self.klass() + recv = mitogen.core.Receiver(self.router) + exc = self.assertRaises(mitogen.master.SelectError, + lambda: select.remove(recv)) + self.assertEquals(str(exc), self.klass.not_present_msg) + + def test_absent(self): + select = self.klass() + recv = mitogen.core.Receiver(self.router) + recv2 = mitogen.core.Receiver(self.router) + select.add(recv2) + exc = self.assertRaises(mitogen.master.SelectError, + lambda: select.remove(recv)) + self.assertEquals(str(exc), self.klass.not_present_msg) + + def test_present(self): + select = self.klass() + recv = mitogen.core.Receiver(self.router) + select.add(recv) + select.remove(recv) + self.assertEquals(0, len(select._receivers)) + self.assertEquals(0, len(recv.notify)) + + +class CloseTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_empty(self): + select = self.klass() + select.close() # No effect. + + def test_one_receiver(self): + select = self.klass() + recv = mitogen.core.Receiver(self.router) + select.add(recv) + + self.assertEquals(1, len(select._receivers)) + self.assertEquals(1, len(recv.notify)) + self.assertEquals(select._put, recv.notify[0]) + + select.close() + self.assertEquals(0, len(select._receivers)) + self.assertEquals(0, len(recv.notify)) + + def test_one_subselect(self): + select = self.klass() + subselect = self.klass() + select.add(subselect) + + recv = mitogen.core.Receiver(self.router) + subselect.add(recv) + + self.assertEquals(1, len(select._receivers)) + self.assertEquals(1, len(recv.notify)) + self.assertEquals(subselect._put, recv.notify[0]) + + select.close() + + self.assertEquals(0, len(select._receivers)) + self.assertEquals(1, len(recv.notify)) + + subselect.close() + self.assertEquals(0, len(recv.notify)) + + +class EmptyTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_no_receivers(self): + select = self.klass() + self.assertTrue(select.empty()) + + def test_empty_receiver(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + self.assertTrue(select.empty()) + + def test_nonempty_before_add(self): + recv = mitogen.core.Receiver(self.router) + recv._on_receive(mitogen.core.Message.pickled('123')) + select = self.klass([recv]) + self.assertFalse(select.empty()) + + def test_nonempty_after_add(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + recv._on_receive(mitogen.core.Message.pickled('123')) + self.assertFalse(select.empty()) + + +class IterTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_empty(self): + select = self.klass() + self.assertEquals([], list(select)) + + def test_nonempty(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + msg = mitogen.core.Message.pickled('123') + recv._on_receive(msg) + self.assertEquals([(recv, (msg, '123'))], list(select)) + + +class OneShotTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_true_removed_after_get(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + msg = mitogen.core.Message.pickled('123') + recv._on_receive(msg) + recv, (msg_, data) = select.get() + self.assertEquals(msg, msg_) + self.assertEquals(0, len(select._receivers)) + self.assertEquals(0, len(recv.notify)) + + def test_false_persists_after_get(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv], oneshot=False) + msg = mitogen.core.Message.pickled('123') + recv._on_receive(msg) + self.assertEquals((recv, (msg, '123')), select.get()) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(recv, select._receivers[0]) + self.assertEquals(1, len(recv.notify)) + self.assertEquals(select._put, recv.notify[0]) + + +class GetTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.master.Select + + def test_no_receivers(self): + select = self.klass() + exc = self.assertRaises(mitogen.master.SelectError, + lambda: select.get()) + self.assertEquals(str(exc), self.klass.empty_msg) + + def test_timeout_no_receivers(self): + select = self.klass() + exc = self.assertRaises(mitogen.master.SelectError, + lambda: select.get(timeout=1.0)) + self.assertEquals(str(exc), self.klass.empty_msg) + + def test_zero_timeout(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.0)) + + def test_timeout(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.1)) + + def test_nonempty_before_add(self): + recv = mitogen.core.Receiver(self.router) + recv._on_receive(mitogen.core.Message.pickled('123')) + select = self.klass([recv]) + recv, (msg, data) = select.get() + self.assertEquals('123', data) + + def test_nonempty_after_add(self): + recv = mitogen.core.Receiver(self.router) + select = self.klass([recv]) + recv._on_receive(mitogen.core.Message.pickled('123')) + recv, (msg, data) = select.get() + self.assertEquals('123', data) + + def test_drained_by_other_thread(self): + recv = mitogen.core.Receiver(self.router) + recv._on_receive(mitogen.core.Message.pickled('123')) + select = self.klass([recv]) + msg, data = recv.get() + self.assertEquals('123', data) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.0)) + + +if __name__ == '__main__': + unittest.main()