select: make Select.add() handle multiple buffered items.

Previously given something like:

    l = mitogen.core.Latch()
    l.put(1)
    l.put(2)

    s = mitogen.select.Select([l], oneshot=False)
    assert 1 == s.get(block=False)
    assert 2 == s.get(block=False)

The second call would throw TimeoutError, because Select.add() only
queued the receiver/latch once if it was non-empty, rather than once for
each item as should happen.
pull/612/head
David Wilson 5 years ago
parent 49a6446af8
commit ecc570cbda

@ -224,8 +224,15 @@ class Select(object):
raise Error(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
# After installing the notify function, _put() will potentially begin
# receiving calls from other threads immediately, but not for items
# they already had buffered. For those we call _put(), possibly
# duplicating the effect of other _put() being made concurrently, such
# that the Select ends up with more items in its buffer than exist in
# the underlying receivers. We handle the possibility of receivers
# marked notified yet empty inside Select.get(), so this should be
# robust.
for _ in range(recv.size()):
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
@ -335,5 +342,6 @@ class Select(object):
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
# self._put(), or because Select.add() caused duplicate _put()
# calls. In this case simply retry.
continue

@ -358,6 +358,18 @@ class GetReceiverTest(testlib.RouterMixin, testlib.TestCase):
msg = select.get()
self.assertEquals('123', msg.unpickle())
def test_nonempty_multiple_items_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
recv._on_receive(mitogen.core.Message.pickled('234'))
select = self.klass([recv], oneshot=False)
msg = select.get()
self.assertEquals('123', msg.unpickle())
msg = select.get()
self.assertEquals('234', msg.unpickle())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))
def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
@ -415,6 +427,16 @@ class GetLatchTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([latch])
self.assertEquals(123, select.get())
def test_nonempty_multiple_items_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
latch.put(234)
select = self.klass([latch], oneshot=False)
self.assertEquals(123, select.get())
self.assertEquals(234, select.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))
def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])

Loading…
Cancel
Save