diff --git a/mitogen/core.py b/mitogen/core.py index 25732896..6bca1b1a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2051,6 +2051,8 @@ class Latch(object): """ poller_class = Poller + notify = None + # The _cls_ prefixes here are to make it crystal clear in the code which # state mutation isn't covered by :attr:`_lock`. @@ -2264,6 +2266,8 @@ class Latch(object): _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, wsock.fileno()) self._wake(wsock, cookie) + elif self.notify: + self.notify(self) finally: self._lock.release() diff --git a/mitogen/select.py b/mitogen/select.py index fd2cbe9a..651dcb5f 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -35,12 +35,25 @@ class Error(mitogen.core.Error): pass +class Event(object): + """ + Represents one selected event. + """ + #: The first Receiver or Latch the event traversed. + source = None + + #: The :class:`mitogen.core.Message` delivered to a receiver, or the object + #: posted to a latch. + data = None + + class Select(object): """ Support scatter/gather asynchronous calls and waiting on multiple - receivers, channels, and sub-Selects. Accepts a sequence of - :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances - and returns the first value posted to any receiver or select. + receivers, channels, latches, and sub-Selects. Accepts a sequence of + :class:`mitogen.core.Receiver`, :class:`mitogen.select.Select` or + :class:`mitogen.core.Latch` instances and returns the first value posted to + any receiver or select. If `oneshot` is :data:`True`, then remove each receiver as it yields a result; since :meth:`__iter__` terminates once the final receiver is @@ -84,6 +97,19 @@ class Select(object): for msg in mitogen.select.Select(selects): print(msg.unpickle()) + + :class:`Select` may be used to mix inter-thread and inter-process IO: + + latch = mitogen.core.Latch() + start_thread(latch) + recv = remote_host.call_async(os.getuid) + + sel = Select([latch, recv]) + event = sel.get_event() + if event.source is latch: + # woken by a local thread + else: + # woken by function call result """ notify = None @@ -145,14 +171,29 @@ class Select(object): def __exit__(self, e_type, e_val, e_tb): self.close() - def __iter__(self): + def iter_data(self): """ - Yield the result of :meth:`get` until no receivers remain in the - select, either because `oneshot` is :data:`True`, or each receiver was + Yield :attr:`Event.data` until no receivers remain in the select, + either because `oneshot` is :data:`True`, or each receiver was explicitly removed via :meth:`remove`. + + :meth:`__iter__` is an alias for :meth:`iter_data`, allowing loops + like:: + + for msg in Select([recv1, recv2]): + print msg.unpickle() """ while self._receivers: - yield self.get() + yield self.get_event().data + + __iter__ = iter_data + + def iter_events(self): + """ + Yield :class:`Event` instances until no receivers remain in the select. + """ + while self._receivers: + yield self.get_event() loop_msg = 'Adding this Select instance would create a Select cycle' @@ -170,8 +211,8 @@ class Select(object): def add(self, recv): """ - Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the - select. + Add a :class:`mitogen.core.Receiver`, :class:`Select` or + :class:`mitogen.core.Latch` to the select. :raises mitogen.select.Error: An attempt was made to add a :class:`Select` to which this select @@ -193,10 +234,9 @@ class Select(object): def remove(self, recv): """ - Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv` - from the select. Note that if the receiver has notified prior to - :meth:`remove`, it will still be returned by a subsequent :meth:`get`. - This may change in a future version. + Remove an object from from the select. Note that if the receiver has + notified prior to :meth:`remove`, it will still be returned by a + subsequent :meth:`get`. This may change in a future version. """ try: if recv.notify != self._put: @@ -239,6 +279,13 @@ class Select(object): empty_msg = 'Cannot get(), Select instance is empty' def get(self, timeout=None, block=True): + """ + Call `get_event(timeout, block)` returning :attr:`Event.data` of the + first available event. + """ + return self.get_event(timeout, block).data + + def get_event(self, timeout=None, block=True): """ Fetch the next available value from any receiver, or raise :class:`mitogen.core.TimeoutError` if no value is available within @@ -263,14 +310,21 @@ class Select(object): if not self._receivers: raise Error(self.empty_msg) + event = Event() while True: recv = self._latch.get(timeout=timeout, block=block) try: - msg = recv.get(block=False) + if isinstance(recv, Select): + event = recv.get_event(block=False) + else: + event.source = recv + event.data = recv.get(block=False) if self._oneshot: self.remove(recv) - msg.receiver = recv - return msg + if isinstance(recv, mitogen.core.Receiver): + # Remove in 0.3.x. + event.data.receiver = recv + return event except mitogen.core.TimeoutError: # A receiver may have been queued with no result if another # thread drained it before we woke up, or because another diff --git a/tests/select_test.py b/tests/select_test.py index 7e6256c8..f08c9f3a 100644 --- a/tests/select_test.py +++ b/tests/select_test.py @@ -9,6 +9,18 @@ import testlib class BoolTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select + def test_latch(self): + latch = mitogen.core.Latch() # oneshot + select = self.klass() + self.assertFalse(select) + select.add(latch) + self.assertTrue(select) + + latch.put(123) + self.assertTrue(select) + self.assertEquals(123, select.get()) + self.assertFalse(select) + def test_receiver(self): recv = mitogen.core.Receiver(self.router) # oneshot select = self.klass() @@ -25,6 +37,14 @@ class BoolTest(testlib.RouterMixin, testlib.TestCase): class AddTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select + def test_latch(self): + latch = mitogen.core.Latch() + select = self.klass() + select.add(latch) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(latch, select._receivers[0]) + self.assertEquals(select._put, latch.notify) + def test_receiver(self): recv = mitogen.core.Receiver(self.router) select = self.klass() @@ -98,14 +118,14 @@ class AddTest(testlib.RouterMixin, testlib.TestCase): class RemoveTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select - def test_empty(self): + def test_receiver_empty(self): select = self.klass() recv = mitogen.core.Receiver(self.router) exc = self.assertRaises(mitogen.select.Error, lambda: select.remove(recv)) self.assertEquals(str(exc), self.klass.not_present_msg) - def test_absent(self): + def test_receiver_absent(self): select = self.klass() recv = mitogen.core.Receiver(self.router) recv2 = mitogen.core.Receiver(self.router) @@ -114,7 +134,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase): lambda: select.remove(recv)) self.assertEquals(str(exc), self.klass.not_present_msg) - def test_present(self): + def test_receiver_present(self): select = self.klass() recv = mitogen.core.Receiver(self.router) select.add(recv) @@ -122,6 +142,30 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(0, len(select._receivers)) self.assertEquals(None, recv.notify) + def test_latch_empty(self): + select = self.klass() + latch = mitogen.core.Latch() + exc = self.assertRaises(mitogen.select.Error, + lambda: select.remove(latch)) + self.assertEquals(str(exc), self.klass.not_present_msg) + + def test_latch_absent(self): + select = self.klass() + latch = mitogen.core.Latch() + latch2 = mitogen.core.Latch() + select.add(latch2) + exc = self.assertRaises(mitogen.select.Error, + lambda: select.remove(latch)) + self.assertEquals(str(exc), self.klass.not_present_msg) + + def test_latch_present(self): + select = self.klass() + latch = mitogen.core.Latch() + select.add(latch) + select.remove(latch) + self.assertEquals(0, len(select._receivers)) + self.assertEquals(None, latch.notify) + class CloseTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select @@ -130,6 +174,18 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase): select = self.klass() select.close() # No effect. + def test_one_latch(self): + select = self.klass() + latch = mitogen.core.Latch() + select.add(latch) + + self.assertEquals(1, len(select._receivers)) + self.assertEquals(select._put, latch.notify) + + select.close() + self.assertEquals(0, len(select._receivers)) + self.assertEquals(None, latch.notify) + def test_one_receiver(self): select = self.klass() recv = mitogen.core.Receiver(self.router) @@ -174,18 +230,35 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase): select = self.klass([recv]) self.assertTrue(select.empty()) - def test_nonempty_before_add(self): + def test_nonempty_receiver_before_add(self): recv = mitogen.core.Receiver(self.router) recv._on_receive(mitogen.core.Message.pickled('123')) select = self.klass([recv]) self.assertFalse(select.empty()) - def test_nonempty_after_add(self): + def test_nonempty__receiver_after_add(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv]) recv._on_receive(mitogen.core.Message.pickled('123')) self.assertFalse(select.empty()) + def test_empty_latch(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + self.assertTrue(select.empty()) + + def test_nonempty_latch_before_add(self): + latch = mitogen.core.Latch() + latch.put(123) + select = self.klass([latch]) + self.assertFalse(select.empty()) + + def test_nonempty__latch_after_add(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + latch.put(123) + self.assertFalse(select.empty()) + class IterTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select @@ -194,18 +267,24 @@ class IterTest(testlib.RouterMixin, testlib.TestCase): select = self.klass() self.assertEquals([], list(select)) - def test_nonempty(self): + def test_nonempty_receiver(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv]) msg = mitogen.core.Message.pickled('123') recv._on_receive(msg) self.assertEquals([msg], list(select)) + def test_nonempty_latch(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + latch.put(123) + self.assertEquals([123], list(select)) + class OneShotTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select - def test_true_removed_after_get(self): + def test_true_receiver_removed_after_get(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv]) msg = mitogen.core.Message.pickled('123') @@ -215,7 +294,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(0, len(select._receivers)) self.assertEquals(None, recv.notify) - def test_false_persists_after_get(self): + def test_false_receiver_persists_after_get(self): recv = mitogen.core.Receiver(self.router) select = self.klass([recv], oneshot=False) msg = mitogen.core.Message.pickled('123') @@ -226,8 +305,26 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(recv, select._receivers[0]) self.assertEquals(select._put, recv.notify) + def test_true_latch_removed_after_get(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + latch.put(123) + self.assertEquals(123, select.get()) + self.assertEquals(0, len(select._receivers)) + self.assertEquals(None, latch.notify) -class GetTest(testlib.RouterMixin, testlib.TestCase): + def test_false_latch_persists_after_get(self): + latch = mitogen.core.Latch() + select = self.klass([latch], oneshot=False) + latch.put(123) + + self.assertEquals(123, select.get()) + self.assertEquals(1, len(select._receivers)) + self.assertEquals(latch, select._receivers[0]) + self.assertEquals(select._put, latch.notify) + + +class GetReceiverTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.select.Select def test_no_receivers(self): @@ -285,5 +382,79 @@ class GetTest(testlib.RouterMixin, testlib.TestCase): lambda: select.get(timeout=0.0)) +class GetLatchTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.select.Select + + def test_no_latches(self): + select = self.klass() + exc = self.assertRaises(mitogen.select.Error, + lambda: select.get()) + self.assertEquals(str(exc), self.klass.empty_msg) + + def test_timeout_no_receivers(self): + select = self.klass() + exc = self.assertRaises(mitogen.select.Error, + lambda: select.get(timeout=1.0)) + self.assertEquals(str(exc), self.klass.empty_msg) + + def test_zero_timeout(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.0)) + + def test_timeout(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.1)) + + def test_nonempty_before_add(self): + latch = mitogen.core.Latch() + latch.put(123) + select = self.klass([latch]) + self.assertEquals(123, select.get()) + + def test_nonempty_after_add(self): + latch = mitogen.core.Latch() + select = self.klass([latch]) + latch.put(123) + self.assertEquals(123, latch.get()) + + def test_drained_by_other_thread(self): + latch = mitogen.core.Latch() + latch.put(123) + select = self.klass([latch]) + self.assertEquals(123, latch.get()) + self.assertRaises(mitogen.core.TimeoutError, + lambda: select.get(timeout=0.0)) + + +class GetEventTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.select.Select + + def test_empty(self): + select = self.klass() + exc = self.assertRaises(mitogen.select.Error, + lambda: select.get()) + self.assertEquals(str(exc), self.klass.empty_msg) + + def test_latch(self): + latch = mitogen.core.Latch() + latch.put(123) + select = self.klass([latch]) + event = select.get_event() + self.assertEquals(latch, event.source) + self.assertEquals(123, event.data) + + def test_receiver(self): + recv = mitogen.core.Receiver(self.router) + recv._on_receive(mitogen.core.Message.pickled('123')) + select = self.klass([recv]) + event = select.get_event() + self.assertEquals(recv, event.source) + self.assertEquals('123', event.data.unpickle()) + + if __name__ == '__main__': unittest2.main()