diff --git a/docs/changelog.rst b/docs/changelog.rst index c1fe401e..503c797c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -37,14 +37,18 @@ Fixes ^^^^^ * `#334 `_: the SSH method - tilde-expands private key paths using Ansible's logic. Previously Mitogen - passed the path unmodified to SSH, which would expand it using - :func:`os.getpwent`. + tilde-expands private key paths using Ansible's logic. Previously Mitogen + passed the path unmodified to SSH, which would expand it using + :func:`os.getpwent`. - This differs from :func:`os.path.expanduser`, which prefers the ``HOME`` - environment variable if it is set, causing behaviour to diverge when Ansible - was invoked using sudo without appropriate flags to cause the ``HOME`` - environment variable to be reset to match the target account. + This differs from :func:`os.path.expanduser`, which prefers the ``HOME`` + environment variable if it is set, causing behaviour to diverge when Ansible + was invoked using sudo without appropriate flags to cause the ``HOME`` + environment variable to be reset to match the target account. + +* `#373 `_: the LXC and LXD methods + now print a useful hint when Python fails to start, as no useful error is + normally logged to the console by these tools. Core Library @@ -56,11 +60,10 @@ Core Library every stream that ever communicated with a disappearing peer, rather than simply toward parents. - Conversations between nodes in any level of the connection tree should - correctly receive ``DEL_ROUTE`` messages when a participant disconnects, - allowing receivers to be woken with :class:`mitogen.core.ChannelError` to - signal the connection has broken, even when one participant is not a parent - of the other. + Conversations between nodes in any level of the tree receive ``DEL_ROUTE`` + messages when a participant disconnects, allowing receivers to be woken with + :class:`mitogen.core.ChannelError` to signal the connection has broken, even + when one participant is not a parent of the other. * `#405 `_: if a message is rejected due to being too large, and it has a ``reply_to`` set, a dead message is @@ -68,9 +71,9 @@ Core Library maximum size crash rather than hang. * `#411 `_: the SSH method typed - "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` - was configured. This would lead to connection timeouts due to the hung - response. + "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` + was configured. This would lead to connection timeouts due to the hung + response. * `16ca111e `_: handle OpenSSH 7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present. @@ -84,6 +87,7 @@ Thanks! Mitogen would not be possible without the support of users. A huge thanks for bug reports, features and fixes in this release contributed by +`Brian Candler `_, and `Guy Knights `_. diff --git a/mitogen/core.py b/mitogen/core.py index 642540a5..d3909773 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1286,17 +1286,20 @@ def _unpickle_context(router, context_id, name): class Poller(object): + #: Increments on every poll(). Used to version _rfds and _wfds. + _generation = 1 + def __init__(self): self._rfds = {} self._wfds = {} @property def readers(self): - return list(self._rfds.items()) + return list((fd, data) for fd, (data, gen) in self._rfds.items()) @property def writers(self): - return list(self._wfds.items()) + return list((fd, data) for fd, (data, gen) in self._wfds.items()) def __repr__(self): return '%s(%#x)' % (type(self).__name__, id(self)) @@ -1305,19 +1308,18 @@ class Poller(object): pass def start_receive(self, fd, data=None): - self._rfds[fd] = data or fd + self._rfds[fd] = (data or fd, self._generation) def stop_receive(self, fd): self._rfds.pop(fd, None) def start_transmit(self, fd, data=None): - self._wfds[fd] = data or fd + self._wfds[fd] = (data or fd, self._generation) def stop_transmit(self, fd): self._wfds.pop(fd, None) - def poll(self, timeout=None): - _vv and IOLOG.debug('%r.poll(%r)', self, timeout) + def _poll(self, timeout): (rfds, wfds, _), _ = io_op(select.select, self._rfds, self._wfds, @@ -1326,11 +1328,20 @@ class Poller(object): for fd in rfds: _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) - yield self._rfds[fd] + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data for fd in wfds: _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) - yield self._wfds[fd] + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + yield data + + def poll(self, timeout=None): + _vv and IOLOG.debug('%r.poll(%r)', self, timeout) + self._generation += 1 + return self._poll(timeout) class Latch(object): diff --git a/mitogen/lxc.py b/mitogen/lxc.py index 71b12221..cd85be4f 100644 --- a/mitogen/lxc.py +++ b/mitogen/lxc.py @@ -48,6 +48,12 @@ class Stream(mitogen.parent.Stream): container = None lxc_attach_path = 'lxc-attach' + eof_error_hint = ( + 'Note: many versions of LXC do not report program execution failure ' + 'meaningfully. Please check the host logs (/var/log) for more ' + 'information.' + ) + def construct(self, container, lxc_attach_path=None, **kwargs): super(Stream, self).construct(**kwargs) self.container = container diff --git a/mitogen/lxd.py b/mitogen/lxd.py index 9e6702f4..a28e1aaa 100644 --- a/mitogen/lxd.py +++ b/mitogen/lxd.py @@ -49,6 +49,12 @@ class Stream(mitogen.parent.Stream): lxc_path = 'lxc' python_path = 'python' + eof_error_hint = ( + 'Note: many versions of LXC do not report program execution failure ' + 'meaningfully. Please check the host logs (/var/log) for more ' + 'information.' + ) + def construct(self, container, lxc_path=None, **kwargs): super(Stream, self).construct(**kwargs) self.container = container diff --git a/mitogen/parent.py b/mitogen/parent.py index 9e878e3f..4549d877 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -407,6 +407,8 @@ def write_all(fd, s, deadline=None): :raises mitogen.core.TimeoutError: Bytestring could not be written entirely before deadline was exceeded. + :raises mitogen.parent.EofError: + Stream indicated EOF, suggesting the child process has exitted. :raises mitogen.core.StreamError: File descriptor was disconnected before write could complete. """ @@ -430,7 +432,7 @@ def write_all(fd, s, deadline=None): for fd in poller.poll(timeout): n, disconnected = mitogen.core.io_op(os.write, fd, window) if disconnected: - raise mitogen.core.StreamError('EOF on stream during write') + raise EofError('EOF on stream during write') written += n finally: @@ -449,6 +451,8 @@ def iter_read(fds, deadline=None): :raises mitogen.core.TimeoutError: Attempt to read beyond deadline. + :raises mitogen.parent.EofError: + All streams indicated EOF, suggesting the child process has exitted. :raises mitogen.core.StreamError: Attempt to read past end of file. """ @@ -478,10 +482,9 @@ def iter_read(fds, deadline=None): poller.close() if not poller.readers: - raise mitogen.core.StreamError( - u'EOF on stream; last 300 bytes received: %r' % - (b('').join(bits)[-300:].decode('latin1'),) - ) + raise EofError(u'EOF on stream; last 300 bytes received: %r' % + (b('').join(bits)[-300:].decode('latin1'),)) + raise mitogen.core.TimeoutError('read timed out') @@ -501,6 +504,8 @@ def discard_until(fd, s, deadline): :raises mitogen.core.TimeoutError: Attempt to read beyond deadline. + :raises mitogen.parent.EofError: + All streams indicated EOF, suggesting the child process has exitted. :raises mitogen.core.StreamError: Attempt to read past end of file. """ @@ -607,6 +612,14 @@ def wstatus_to_str(status): return 'unknown wait status (%d)' % (status,) +class EofError(mitogen.core.StreamError): + """ + Raised by :func:`iter_read` and :func:`write_all` when EOF is detected by + the child process. + """ + # inherits from StreamError to maintain compatibility. + + class Argv(object): """ Wrapper to defer argv formatting when debug logging is disabled. @@ -681,14 +694,6 @@ class KqueuePoller(mitogen.core.Poller): def close(self): self._kqueue.close() - @property - def readers(self): - return list(self._rfds.items()) - - @property - def writers(self): - return list(self._wfds.items()) - def _control(self, fd, filters, flags): mitogen.core._vv and IOLOG.debug( '%r._control(%r, %r, %r)', self, fd, filters, flags) @@ -707,7 +712,7 @@ class KqueuePoller(mitogen.core.Poller): self, fd, data) if fd not in self._rfds: self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) - self._rfds[fd] = data or fd + self._rfds[fd] = (data or fd, self._generation) def stop_receive(self, fd): mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) @@ -720,7 +725,7 @@ class KqueuePoller(mitogen.core.Poller): self, fd, data) if fd not in self._wfds: self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) - self._wfds[fd] = data or fd + self._wfds[fd] = (data or fd, self._generation) def stop_transmit(self, fd): mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) @@ -728,7 +733,7 @@ class KqueuePoller(mitogen.core.Poller): self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) del self._wfds[fd] - def poll(self, timeout=None): + def _poll(self, timeout): changelist = self._changelist self._changelist = [] events, _ = mitogen.core.io_op(self._kqueue.control, @@ -738,13 +743,17 @@ class KqueuePoller(mitogen.core.Poller): if event.flags & select.KQ_EV_ERROR: LOG.debug('ignoring stale event for fd %r: errno=%d: %s', fd, event.data, errno.errorcode.get(event.data)) - elif event.filter == select.KQ_FILTER_READ and fd in self._rfds: + elif event.filter == select.KQ_FILTER_READ: + data, gen = self._rfds.get(fd, (None, None)) # Events can still be read for an already-discarded fd. - mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) - yield self._rfds[fd] + if gen and gen < self._generation: + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) + yield data elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds: - mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) - yield self._wfds[fd] + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) + yield data class EpollPoller(mitogen.core.Poller): @@ -759,14 +768,6 @@ class EpollPoller(mitogen.core.Poller): def close(self): self._epoll.close() - @property - def readers(self): - return list(self._rfds.items()) - - @property - def writers(self): - return list(self._wfds.items()) - def _control(self, fd): mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) mask = (((fd in self._rfds) and select.EPOLLIN) | @@ -784,7 +785,7 @@ class EpollPoller(mitogen.core.Poller): def start_receive(self, fd, data=None): mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)', self, fd, data) - self._rfds[fd] = data or fd + self._rfds[fd] = (data or fd, self._generation) self._control(fd) def stop_receive(self, fd): @@ -795,7 +796,7 @@ class EpollPoller(mitogen.core.Poller): def start_transmit(self, fd, data=None): mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)', self, fd, data) - self._wfds[fd] = data or fd + self._wfds[fd] = (data or fd, self._generation) self._control(fd) def stop_transmit(self, fd): @@ -806,20 +807,24 @@ class EpollPoller(mitogen.core.Poller): _inmask = (getattr(select, 'EPOLLIN', 0) | getattr(select, 'EPOLLHUP', 0)) - def poll(self, timeout=None): + def _poll(self, timeout): the_timeout = -1 if timeout is not None: the_timeout = timeout events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32) for fd, event in events: - if event & self._inmask and fd in self._rfds: - # Events can still be read for an already-discarded fd. - mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) - yield self._rfds[fd] - if event & select.EPOLLOUT and fd in self._wfds: - mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) - yield self._wfds[fd] + if event & self._inmask: + data, gen = self._rfds.get(fd, (None, None)) + if gen and gen < self._generation: + # Events can still be read for an already-discarded fd. + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) + yield data + if event & select.EPOLLOUT: + data, gen = self._wfds.get(fd, (None, None)) + if gen and gen < self._generation: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) + yield data POLLER_BY_SYSNAME = { @@ -1113,6 +1118,16 @@ class Stream(mitogen.core.Stream): msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args)) raise mitogen.core.StreamError(msg) + eof_error_hint = None + + def _adorn_eof_error(self, e): + """ + Used by subclasses to provide additional information in the case of a + failed connection. + """ + if self.eof_error_hint: + e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),) + def connect(self): LOG.debug('%r.connect()', self) self.pid, fd, extra_fd = self.start_child() @@ -1124,6 +1139,10 @@ class Stream(mitogen.core.Stream): try: self._connect_bootstrap(extra_fd) + except EofError: + e = sys.exc_info()[1] + self._adorn_eof_error(e) + raise except Exception: self._reap_child() raise diff --git a/tests/lxc_test.py b/tests/lxc_test.py index 3168aad2..5d8e14d8 100644 --- a/tests/lxc_test.py +++ b/tests/lxc_test.py @@ -1,6 +1,7 @@ import os import mitogen +import mitogen.lxc import unittest2 @@ -11,19 +12,29 @@ def has_subseq(seq, subseq): return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq))) -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): + lxc_attach_path = testlib.data_path('stubs/lxc-attach.py') + def test_okay(self): - lxc_attach_path = testlib.data_path('stubs/lxc-attach.py') context = self.router.lxc( container='container_name', - lxc_attach_path=lxc_attach_path, + lxc_attach_path=self.lxc_attach_path, ) argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV')) - self.assertEquals(argv[0], lxc_attach_path) + self.assertEquals(argv[0], self.lxc_attach_path) self.assertTrue('--clear-env' in argv) self.assertTrue(has_subseq(argv, ['--name', 'container_name'])) + def test_eof(self): + e = self.assertRaises(mitogen.parent.EofError, + lambda: self.router.lxc( + container='container_name', + lxc_attach_path='true', + ) + ) + self.assertTrue(str(e).endswith(mitogen.lxc.Stream.eof_error_hint)) + if __name__ == '__main__': unittest2.main() diff --git a/tests/lxd_test.py b/tests/lxd_test.py index 41e9df15..cae1172c 100644 --- a/tests/lxd_test.py +++ b/tests/lxd_test.py @@ -1,13 +1,15 @@ import os import mitogen +import mitogen.lxd +import mitogen.parent import unittest2 import testlib -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): lxc_path = testlib.data_path('stubs/lxc.py') context = self.router.lxd( @@ -21,6 +23,15 @@ class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals(argv[2], '--mode=noninteractive') self.assertEquals(argv[3], 'container_name') + def test_eof(self): + e = self.assertRaises(mitogen.parent.EofError, + lambda: self.router.lxd( + container='container_name', + lxc_path='true', + ) + ) + self.assertTrue(str(e).endswith(mitogen.lxd.Stream.eof_error_hint)) + if __name__ == '__main__': unittest2.main() diff --git a/tests/poller_test.py b/tests/poller_test.py new file mode 100644 index 00000000..a6190821 --- /dev/null +++ b/tests/poller_test.py @@ -0,0 +1,396 @@ + +import errno +import os +import select +import socket +import sys +import time + +import unittest2 + +import mitogen.core +import mitogen.parent + +import testlib + + +class SockMixin(object): + def tearDown(self): + self.close_socks() + super(SockMixin, self).tearDown() + + def setUp(self): + super(SockMixin, self).setUp() + self._setup_socks() + + def _setup_socks(self): + # "left" and "right" side of two socket pairs. We use sockets instead + # of pipes since the same process can manipulate transmit/receive + # buffers on both sides (bidirectional IO), making it easier to test + # combinations of readability/writeability on the one side of a single + # file object. + self.l1_sock, self.r1_sock = socket.socketpair() + self.l1 = self.l1_sock.fileno() + self.r1 = self.r1_sock.fileno() + + self.l2_sock, self.r2_sock = socket.socketpair() + self.l2 = self.l2_sock.fileno() + self.r2 = self.r2_sock.fileno() + for fd in self.l1, self.r1, self.l2, self.r2: + mitogen.core.set_nonblock(fd) + + def fill(self, fd): + """Make `fd` unwriteable.""" + while True: + try: + os.write(fd, 'x'*4096) + except OSError: + e = sys.exc_info()[1] + if e.args[0] == errno.EAGAIN: + return + raise + + def drain(self, fd): + """Make `fd` unreadable.""" + while True: + try: + if not os.read(fd, 4096): + return + except OSError: + e = sys.exc_info()[1] + if e.args[0] == errno.EAGAIN: + return + raise + + def close_socks(self): + for sock in self.l1_sock, self.r1_sock, self.l2_sock, self.r2_sock: + sock.close() + + +class PollerMixin(object): + klass = None + + def setUp(self): + super(PollerMixin, self).setUp() + self.p = self.klass() + + def tearDown(self): + self.p.close() + super(PollerMixin, self).tearDown() + + +class ReceiveStateMixin(PollerMixin, SockMixin): + def test_start_receive_adds_reader(self): + self.p.start_receive(self.l1) + self.assertEquals([(self.l1, self.l1)], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_start_receive_adds_reader_data(self): + data = object() + self.p.start_receive(self.l1, data=data) + self.assertEquals([(self.l1, data)], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_stop_receive(self): + self.p.start_receive(self.l1) + self.p.stop_receive(self.l1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_stop_receive_dup(self): + self.p.start_receive(self.l1) + self.p.stop_receive(self.l1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + self.p.stop_receive(self.l1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_stop_receive_noexist(self): + p = self.klass() + p.stop_receive(123) # should not fail + self.assertEquals([], p.readers) + self.assertEquals([], self.p.writers) + + +class TransmitStateMixin(PollerMixin, SockMixin): + def test_start_transmit_adds_writer(self): + self.p.start_transmit(self.r1) + self.assertEquals([], self.p.readers) + self.assertEquals([(self.r1, self.r1)], self.p.writers) + + def test_start_transmit_adds_writer_data(self): + data = object() + self.p.start_transmit(self.r1, data=data) + self.assertEquals([], self.p.readers) + self.assertEquals([(self.r1, data)], self.p.writers) + + def test_stop_transmit(self): + self.p.start_transmit(self.r1) + self.p.stop_transmit(self.r1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_stop_transmit_dup(self): + self.p.start_transmit(self.r1) + self.p.stop_transmit(self.r1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + self.p.stop_transmit(self.r1) + self.assertEquals([], self.p.readers) + self.assertEquals([], self.p.writers) + + def test_stop_transmit_noexist(self): + p = self.klass() + p.stop_receive(123) # should not fail + self.assertEquals([], p.readers) + self.assertEquals([], self.p.writers) + + +class CloseMixin(PollerMixin): + def test_single_close(self): + self.p.close() + + def test_double_close(self): + self.p.close() + self.p.close() + + +class PollMixin(PollerMixin): + def test_empty_zero_timeout(self): + t0 = time.time() + self.assertEquals([], list(self.p.poll(0))) + self.assertTrue((time.time() - t0) < .1) # vaguely reasonable + + def test_empty_small_timeout(self): + t0 = time.time() + self.assertEquals([], list(self.p.poll(.2))) + self.assertTrue((time.time() - t0) >= .2) + + +class ReadableMixin(PollerMixin, SockMixin): + def test_unreadable(self): + self.p.start_receive(self.l1) + self.assertEquals([], list(self.p.poll(0))) + + def test_readable_before_add(self): + self.fill(self.r1) + self.p.start_receive(self.l1) + self.assertEquals([self.l1], list(self.p.poll(0))) + + def test_readable_after_add(self): + self.p.start_receive(self.l1) + self.fill(self.r1) + self.assertEquals([self.l1], list(self.p.poll(0))) + + def test_readable_then_unreadable(self): + self.fill(self.r1) + self.p.start_receive(self.l1) + self.assertEquals([self.l1], list(self.p.poll(0))) + self.drain(self.l1) + self.assertEquals([], list(self.p.poll(0))) + + def test_readable_data(self): + data = object() + self.fill(self.r1) + self.p.start_receive(self.l1, data=data) + self.assertEquals([data], list(self.p.poll(0))) + + def test_double_readable_data(self): + data1 = object() + data2 = object() + self.fill(self.r1) + self.p.start_receive(self.l1, data=data1) + self.fill(self.r2) + self.p.start_receive(self.l2, data=data2) + self.assertEquals(set([data1, data2]), set(self.p.poll(0))) + + +class WriteableMixin(PollerMixin, SockMixin): + def test_writeable(self): + self.p.start_transmit(self.r1) + self.assertEquals([self.r1], list(self.p.poll(0))) + + def test_writeable_data(self): + data = object() + self.p.start_transmit(self.r1, data=data) + self.assertEquals([data], list(self.p.poll(0))) + + def test_unwriteable_before_add(self): + self.fill(self.r1) + self.p.start_transmit(self.r1) + self.assertEquals([], list(self.p.poll(0))) + + def test_unwriteable_after_add(self): + self.p.start_transmit(self.r1) + self.fill(self.r1) + self.assertEquals([], list(self.p.poll(0))) + + def test_unwriteable_then_writeable(self): + self.fill(self.r1) + self.p.start_transmit(self.r1) + self.assertEquals([], list(self.p.poll(0))) + self.drain(self.l1) + self.assertEquals([self.r1], list(self.p.poll(0))) + + def test_double_unwriteable_then_Writeable(self): + self.fill(self.r1) + self.p.start_transmit(self.r1) + + self.fill(self.r2) + self.p.start_transmit(self.r2) + + self.assertEquals([], list(self.p.poll(0))) + + self.drain(self.l1) + self.assertEquals([self.r1], list(self.p.poll(0))) + + self.drain(self.l2) + self.assertEquals(set([self.r1, self.r2]), set(self.p.poll(0))) + + +class MutateDuringYieldMixin(PollerMixin, SockMixin): + # verify behaviour when poller contents is modified in the middle of + # poll() output generation. + + def test_one_readable_removed_before_yield(self): + self.fill(self.l1) + self.p.start_receive(self.r1) + p = self.p.poll(0) + self.p.stop_receive(self.r1) + self.assertEquals([], list(p)) + + def test_one_writeable_removed_before_yield(self): + self.p.start_transmit(self.r1) + p = self.p.poll(0) + self.p.stop_transmit(self.r1) + self.assertEquals([], list(p)) + + def test_one_readable_readded_before_yield(self): + # fd removed, closed, another fd opened, gets same fd number, re-added. + # event fires for wrong underlying object. + self.fill(self.l1) + self.p.start_receive(self.r1) + p = self.p.poll(0) + self.p.stop_receive(self.r1) + self.p.start_receive(self.r1) + self.assertEquals([], list(p)) + + def test_one_readable_readded_during_yield(self): + self.fill(self.l1) + self.p.start_receive(self.r1) + + self.fill(self.l2) + self.p.start_receive(self.r2) + + p = self.p.poll(0) + + # figure out which one is consumed and which is still to-read. + consumed = next(p) + ready = (self.r1, self.r2)[consumed == self.r1] + + # now remove and re-add the one that hasn't been read yet. + self.p.stop_receive(ready) + self.p.start_receive(ready) + + # the start_receive() may be for a totally new underlying file object, + # the live loop iteration must not yield any buffered readiness event. + self.assertEquals([], list(p)) + + +class FileClosedMixin(PollerMixin, SockMixin): + # Verify behaviour when a registered file object is closed in various + # scenarios, without first calling stop_receive()/stop_transmit(). + + def test_writeable_then_closed(self): + self.p.start_transmit(self.r1) + self.assertEquals([self.r1], list(self.p.poll(0))) + self.close_socks() + try: + self.assertEquals([], list(self.p.poll(0))) + except select.error: + # a crash is also reasonable here. + pass + + def test_writeable_closed_before_yield(self): + self.p.start_transmit(self.r1) + p = self.p.poll(0) + self.close_socks() + try: + self.assertEquals([], list(p)) + except select.error: + # a crash is also reasonable here. + pass + + def test_readable_then_closed(self): + self.fill(self.l1) + self.p.start_receive(self.r1) + self.assertEquals([self.r1], list(self.p.poll(0))) + self.close_socks() + try: + self.assertEquals([], list(self.p.poll(0))) + except select.error: + # a crash is also reasonable here. + pass + + def test_readable_closed_before_yield(self): + self.fill(self.l1) + self.p.start_receive(self.r1) + p = self.p.poll(0) + self.close_socks() + try: + self.assertEquals([], list(p)) + except select.error: + # a crash is also reasonable here. + pass + + +class DistinctDataMixin(PollerMixin, SockMixin): + # Verify different data is yielded for the same FD according to the event + # being raised. + + def test_one_distinct(self): + rdata = object() + wdata = object() + self.p.start_receive(self.r1, data=rdata) + self.p.start_transmit(self.r1, data=wdata) + + self.assertEquals([wdata], list(self.p.poll(0))) + self.fill(self.l1) # r1 is now readable and writeable. + self.assertEquals(set([rdata, wdata]), set(self.p.poll(0))) + + +class AllMixin(ReceiveStateMixin, + TransmitStateMixin, + ReadableMixin, + WriteableMixin, + MutateDuringYieldMixin, + FileClosedMixin, + DistinctDataMixin, + PollMixin, + CloseMixin): + """ + Helper to avoid cutpasting mixin names below. + """ + + +@unittest2.skipIf(condition=not hasattr(select, 'select'), + reason='select.select() not supported') +class SelectTest(AllMixin, testlib.TestCase): + klass = mitogen.core.Poller + + +@unittest2.skipIf(condition=not hasattr(select, 'kqueue'), + reason='select.kqueue() not supported') +class KqueueTest(AllMixin, testlib.TestCase): + klass = mitogen.parent.KqueuePoller + + +@unittest2.skipIf(condition=not hasattr(select, 'epoll'), + reason='select.epoll() not supported') +class EpollTest(AllMixin, testlib.TestCase): + klass = mitogen.parent.EpollPoller + + +if __name__ == '__main__': + unittest2.main()