issue #535: core/select: support selecting from Latches.

pull/564/head
David Wilson 5 years ago
parent 1397c0eec3
commit b3f592acee

@ -2051,6 +2051,8 @@ class Latch(object):
"""
poller_class = Poller
notify = None
# The _cls_ prefixes here are to make it crystal clear in the code which
# state mutation isn't covered by :attr:`_lock`.
@ -2264,6 +2266,8 @@ class Latch(object):
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, wsock.fileno())
self._wake(wsock, cookie)
elif self.notify:
self.notify(self)
finally:
self._lock.release()

@ -35,12 +35,25 @@ class Error(mitogen.core.Error):
pass
class Event(object):
"""
Represents one selected event.
"""
#: The first Receiver or Latch the event traversed.
source = None
#: The :class:`mitogen.core.Message` delivered to a receiver, or the object
#: posted to a latch.
data = None
class Select(object):
"""
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances
and returns the first value posted to any receiver or select.
receivers, channels, latches, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver`, :class:`mitogen.select.Select` or
:class:`mitogen.core.Latch` instances and returns the first value posted to
any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
@ -84,6 +97,19 @@ class Select(object):
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
:class:`Select` may be used to mix inter-thread and inter-process IO:
latch = mitogen.core.Latch()
start_thread(latch)
recv = remote_host.call_async(os.getuid)
sel = Select([latch, recv])
event = sel.get_event()
if event.source is latch:
# woken by a local thread
else:
# woken by function call result
"""
notify = None
@ -145,14 +171,29 @@ class Select(object):
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
def iter_data(self):
"""
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
Yield :attr:`Event.data` until no receivers remain in the select,
either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
:meth:`__iter__` is an alias for :meth:`iter_data`, allowing loops
like::
for msg in Select([recv1, recv2]):
print msg.unpickle()
"""
while self._receivers:
yield self.get()
yield self.get_event().data
__iter__ = iter_data
def iter_events(self):
"""
Yield :class:`Event` instances until no receivers remain in the select.
"""
while self._receivers:
yield self.get_event()
loop_msg = 'Adding this Select instance would create a Select cycle'
@ -170,8 +211,8 @@ class Select(object):
def add(self, recv):
"""
Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the
select.
Add a :class:`mitogen.core.Receiver`, :class:`Select` or
:class:`mitogen.core.Latch` to the select.
:raises mitogen.select.Error:
An attempt was made to add a :class:`Select` to which this select
@ -193,10 +234,9 @@ class Select(object):
def remove(self, recv):
"""
Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv`
from the select. Note that if the receiver has notified prior to
:meth:`remove`, it will still be returned by a subsequent :meth:`get`.
This may change in a future version.
Remove an object from from the select. Note that if the receiver has
notified prior to :meth:`remove`, it will still be returned by a
subsequent :meth:`get`. This may change in a future version.
"""
try:
if recv.notify != self._put:
@ -239,6 +279,13 @@ class Select(object):
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None, block=True):
"""
Call `get_event(timeout, block)` returning :attr:`Event.data` of the
first available event.
"""
return self.get_event(timeout, block).data
def get_event(self, timeout=None, block=True):
"""
Fetch the next available value from any receiver, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
@ -263,14 +310,21 @@ class Select(object):
if not self._receivers:
raise Error(self.empty_msg)
event = Event()
while True:
recv = self._latch.get(timeout=timeout, block=block)
try:
msg = recv.get(block=False)
if isinstance(recv, Select):
event = recv.get_event(block=False)
else:
event.source = recv
event.data = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
if isinstance(recv, mitogen.core.Receiver):
# Remove in 0.3.x.
event.data.receiver = recv
return event
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another

@ -9,6 +9,18 @@ import testlib
class BoolTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch() # oneshot
select = self.klass()
self.assertFalse(select)
select.add(latch)
self.assertTrue(select)
latch.put(123)
self.assertTrue(select)
self.assertEquals(123, select.get())
self.assertFalse(select)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router) # oneshot
select = self.klass()
@ -25,6 +37,14 @@ class BoolTest(testlib.RouterMixin, testlib.TestCase):
class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch()
select = self.klass()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass()
@ -98,14 +118,14 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
def test_receiver_empty(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_absent(self):
def test_receiver_absent(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router)
@ -114,7 +134,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_present(self):
def test_receiver_present(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
@ -122,6 +142,30 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_latch_empty(self):
select = self.klass()
latch = mitogen.core.Latch()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_absent(self):
select = self.klass()
latch = mitogen.core.Latch()
latch2 = mitogen.core.Latch()
select.add(latch2)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_present(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
select.remove(latch)
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -130,6 +174,18 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
select.close() # No effect.
def test_one_latch(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(select._put, latch.notify)
select.close()
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
def test_one_receiver(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
@ -174,18 +230,35 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv])
self.assertTrue(select.empty())
def test_nonempty_before_add(self):
def test_nonempty_receiver_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
self.assertFalse(select.empty())
def test_nonempty_after_add(self):
def test_nonempty__receiver_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
self.assertFalse(select.empty())
def test_empty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertTrue(select.empty())
def test_nonempty_latch_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertFalse(select.empty())
def test_nonempty__latch_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertFalse(select.empty())
class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -194,18 +267,24 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
self.assertEquals([], list(select))
def test_nonempty(self):
def test_nonempty_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals([msg], list(select))
def test_nonempty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals([123], list(select))
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_true_removed_after_get(self):
def test_true_receiver_removed_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
@ -215,7 +294,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_false_persists_after_get(self):
def test_false_receiver_persists_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv], oneshot=False)
msg = mitogen.core.Message.pickled('123')
@ -226,8 +305,26 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(recv, select._receivers[0])
self.assertEquals(select._put, recv.notify)
def test_true_latch_removed_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class GetTest(testlib.RouterMixin, testlib.TestCase):
def test_false_latch_persists_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch], oneshot=False)
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
class GetReceiverTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_receivers(self):
@ -285,5 +382,79 @@ class GetTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.get(timeout=0.0))
class GetLatchTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_latches(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg)
def test_zero_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
def test_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.1))
def test_nonempty_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, select.get())
def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, latch.get())
def test_drained_by_other_thread(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, latch.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
class GetEventTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_latch(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
event = select.get_event()
self.assertEquals(latch, event.source)
self.assertEquals(123, event.data)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
event = select.get_event()
self.assertEquals(recv, event.source)
self.assertEquals('123', event.data.unpickle())
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save