issue #20: tests and fixes for mitogen.master.Select().

wip-fakessh-exit-status
David Wilson 7 years ago
parent e3d967ebeb
commit f869e088f8

@ -97,13 +97,17 @@ contexts.
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
:py:class:`Select` may be arbitrarily nested:
.. code-block:: python
subselects = [
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
mitogen.master.Select(get_some_work()),
mitogen.master.Select([
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
])
]
with mitogen.master.Select(selects, oneshot=False) as select:
@ -134,9 +138,17 @@ contexts.
.. py:method:: empty ()
Return ``True`` if no items appear to be queued on this receiver. Like
:py:class:`Queue.Queue`, this function's return value cannot be relied
upon.
Return ``True`` if no items appear to be queued on this receiver.
As with :py:class:`Queue.Queue`, this function may return ``False``
even though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between the call to
:py:meth:`empty` and :py:meth:`get`.
:py:meth:`empty` may additionally return ``True`` when :py:meth:`get`
would block if another thread has drained a receiver added to this
select. This can be avoided by only consuming each receiver from a
single thread.
.. py:method:: __iter__ (self)

@ -1,3 +1,4 @@
import Queue
import commands
import dis
import errno
@ -231,18 +232,22 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
co.co_consts[arg2] or ())
class SelectError(mitogen.core.Error):
pass
class Select(object):
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._queue = Queue.Queue()
self._notify = []
self.notify = []
for recv in receivers:
self.add(recv)
def _put(self, value):
self._queue.put(value)
for func in self._notify:
for func in self.notify:
func(self)
def __bool__(self):
@ -257,19 +262,38 @@ class Select(object):
def __iter__(self):
while self._receivers:
recv, msg = self.get()
if self._oneshot:
self.remove(recv)
yield recv, msg
loop_msg = 'Adding this Select instance would create a Select cycle'
def _check_no_loop(self, recv):
if recv is self:
raise SelectError(self.loop_msg)
for recv_ in self._receivers:
if recv_ == recv:
raise SelectError(self.loop_msg)
if isinstance(recv_, Select):
recv_._check_no_loop(recv)
def add(self, recv):
if isinstance(recv, Select):
recv._check_no_loop(self)
self._receivers.append(recv)
recv.notify.append(self._put)
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
recv.notify.remove(self._put)
try:
self._receivers.remove(recv)
recv.notify.remove(self._put)
except (IndexError, ValueError):
raise SelectError(self.not_present_msg)
def close(self):
for recv in self._receivers[:]:
@ -278,11 +302,19 @@ class Select(object):
def empty(self):
return self._queue.empty()
empty_msg = 'Cannot got(), Select instance is empty'
def get(self, timeout=None):
if not self._receivers:
raise SelectError(self.empty_msg)
while True:
recv = mitogen.core._queue_interruptible_get(queue, timeout)
recv = mitogen.core._queue_interruptible_get(self._queue, timeout)
try:
return recv, recv.get(block=False)
msg = recv.get(False)
if self._oneshot:
self.remove(recv)
return recv, msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another

@ -0,0 +1,255 @@
import unittest
import mitogen.master
import testlib
class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass()
select.add(recv)
self.assertEquals(1, len(select._receivers))
self.assertEquals(recv, select._receivers[0])
self.assertEquals(1, len(recv.notify))
self.assertEquals(select._put, recv.notify[0])
def test_channel(self):
context = self.router.local()
chan = mitogen.core.Channel(self.router, context, 1234)
select = self.klass()
select.add(chan)
self.assertEquals(1, len(select._receivers))
self.assertEquals(chan, select._receivers[0])
self.assertEquals(1, len(chan.notify))
self.assertEquals(select._put, chan.notify[0])
def test_subselect_empty(self):
select = self.klass()
subselect = self.klass()
select.add(subselect)
self.assertEquals(1, len(select._receivers))
self.assertEquals(subselect, select._receivers[0])
self.assertEquals(1, len(subselect.notify))
self.assertEquals(select._put, subselect.notify[0])
def test_subselect_nonempty(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass()
subselect = self.klass()
subselect.add(recv)
select.add(subselect)
self.assertEquals(1, len(select._receivers))
self.assertEquals(subselect, select._receivers[0])
self.assertEquals(1, len(subselect.notify))
self.assertEquals(select._put, subselect.notify[0])
def test_subselect_loop_direct(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
lambda: select.add(select))
self.assertEquals(str(exc), self.klass.loop_msg)
def test_subselect_loop_indirect(self):
s0 = self.klass()
s1 = self.klass()
s2 = self.klass()
s0.add(s1)
s1.add(s2)
exc = self.assertRaises(mitogen.master.SelectError,
lambda: s2.add(s0))
self.assertEquals(str(exc), self.klass.loop_msg)
class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_empty(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.master.SelectError,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_absent(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router)
select.add(recv2)
exc = self.assertRaises(mitogen.master.SelectError,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_present(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
select.remove(recv)
self.assertEquals(0, len(select._receivers))
self.assertEquals(0, len(recv.notify))
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_empty(self):
select = self.klass()
select.close() # No effect.
def test_one_receiver(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
self.assertEquals(1, len(select._receivers))
self.assertEquals(1, len(recv.notify))
self.assertEquals(select._put, recv.notify[0])
select.close()
self.assertEquals(0, len(select._receivers))
self.assertEquals(0, len(recv.notify))
def test_one_subselect(self):
select = self.klass()
subselect = self.klass()
select.add(subselect)
recv = mitogen.core.Receiver(self.router)
subselect.add(recv)
self.assertEquals(1, len(select._receivers))
self.assertEquals(1, len(recv.notify))
self.assertEquals(subselect._put, recv.notify[0])
select.close()
self.assertEquals(0, len(select._receivers))
self.assertEquals(1, len(recv.notify))
subselect.close()
self.assertEquals(0, len(recv.notify))
class EmptyTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_no_receivers(self):
select = self.klass()
self.assertTrue(select.empty())
def test_empty_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
self.assertTrue(select.empty())
def test_nonempty_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
self.assertFalse(select.empty())
def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
self.assertFalse(select.empty())
class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_empty(self):
select = self.klass()
self.assertEquals([], list(select))
def test_nonempty(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals([(recv, (msg, '123'))], list(select))
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_true_removed_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
recv, (msg_, data) = select.get()
self.assertEquals(msg, msg_)
self.assertEquals(0, len(select._receivers))
self.assertEquals(0, len(recv.notify))
def test_false_persists_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv], oneshot=False)
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals((recv, (msg, '123')), select.get())
self.assertEquals(1, len(select._receivers))
self.assertEquals(recv, select._receivers[0])
self.assertEquals(1, len(recv.notify))
self.assertEquals(select._put, recv.notify[0])
class GetTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
def test_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg)
def test_zero_timeout(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
def test_timeout(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.1))
def test_nonempty_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
recv, (msg, data) = select.get()
self.assertEquals('123', data)
def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
recv, (msg, data) = select.get()
self.assertEquals('123', data)
def test_drained_by_other_thread(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
msg, data = recv.get()
self.assertEquals('123', data)
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save