diff --git a/mitogen/select.py b/mitogen/select.py index f03fdae1..f880fcc3 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -224,8 +224,15 @@ class Select(object): raise Error(self.owned_msg) recv.notify = self._put - # Avoid race by polling once after installation. - if not recv.empty(): + # After installing the notify function, _put() will potentially begin + # receiving calls from other threads immediately, but not for items + # they already had buffered. For those we call _put(), possibly + # duplicating the effect of other _put() being made concurrently, such + # that the Select ends up with more items in its buffer than exist in + # the underlying receivers. We handle the possibility of receivers + # marked notified yet empty inside Select.get(), so this should be + # robust. + for _ in range(recv.size()): self._put(recv) not_present_msg = 'Instance is not a member of this Select' @@ -335,5 +342,6 @@ class Select(object): # A receiver may have been queued with no result if another # thread drained it before we woke up, or because another # thread drained it between add() calling recv.empty() and - # self._put(). In this case just sleep again. + # self._put(), or because Select.add() caused duplicate _put() + # calls. In this case simply retry. continue diff --git a/tests/select_test.py b/tests/select_test.py index f08c9f3a..56e7e6cd 100644 --- a/tests/select_test.py +++ b/tests/select_test.py @@ -358,6 +358,18 @@ class GetReceiverTest(testlib.RouterMixin, testlib.TestCase): msg = select.get() self.assertEquals('123', msg.unpickle()) + def test_nonempty_multiple_items_before_add(self): + recv = mitogen.core.Receiver(self.router) + recv._on_receive(mitogen.core.Message.pickled('123')) + recv._on_receive(mitogen.core.Message.pickled('234')) + select = self.klass([recv], oneshot=False) + msg = select.get() + self.assertEquals('123', msg.unpickle()) + msg = select.get() + self.assertEquals('234', msg.unpickle()) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(block=False)) + def test_nonempty_after_add(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv]) @@ -415,6 +427,16 @@ class GetLatchTest(testlib.RouterMixin, testlib.TestCase): select = self.klass([latch]) self.assertEquals(123, select.get()) + def test_nonempty_multiple_items_before_add(self): + latch = mitogen.core.Latch() + latch.put(123) + latch.put(234) + select = self.klass([latch], oneshot=False) + self.assertEquals(123, select.get()) + self.assertEquals(234, select.get()) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(block=False)) + def test_nonempty_after_add(self): latch = mitogen.core.Latch() select = self.klass([latch])