Merge remote-tracking branch 'origin/dmw'

- poller tests, event versioning
- issue #373
issue260
David Wilson 7 years ago
commit b98b7d1da4

@ -46,6 +46,10 @@ Fixes
was invoked using sudo without appropriate flags to cause the ``HOME`` was invoked using sudo without appropriate flags to cause the ``HOME``
environment variable to be reset to match the target account. environment variable to be reset to match the target account.
* `#373 <https://github.com/dw/mitogen/issues/373>`_: the LXC and LXD methods
now print a useful hint when Python fails to start, as no useful error is
normally logged to the console by these tools.
Core Library Core Library
~~~~~~~~~~~~ ~~~~~~~~~~~~
@ -56,11 +60,10 @@ Core Library
every stream that ever communicated with a disappearing peer, rather than every stream that ever communicated with a disappearing peer, rather than
simply toward parents. simply toward parents.
Conversations between nodes in any level of the connection tree should Conversations between nodes in any level of the tree receive ``DEL_ROUTE``
correctly receive ``DEL_ROUTE`` messages when a participant disconnects, messages when a participant disconnects, allowing receivers to be woken with
allowing receivers to be woken with :class:`mitogen.core.ChannelError` to :class:`mitogen.core.ChannelError` to signal the connection has broken, even
signal the connection has broken, even when one participant is not a parent when one participant is not a parent of the other.
of the other.
* `#405 <https://github.com/dw/mitogen/issues/405>`_: if a message is rejected * `#405 <https://github.com/dw/mitogen/issues/405>`_: if a message is rejected
due to being too large, and it has a ``reply_to`` set, a dead message is due to being too large, and it has a ``reply_to`` set, a dead message is
@ -84,6 +87,7 @@ Thanks!
Mitogen would not be possible without the support of users. A huge thanks for Mitogen would not be possible without the support of users. A huge thanks for
bug reports, features and fixes in this release contributed by bug reports, features and fixes in this release contributed by
`Brian Candler <https://github.com/candlerb>`_, and
`Guy Knights <https://github.com/knightsg>`_. `Guy Knights <https://github.com/knightsg>`_.

@ -1286,17 +1286,20 @@ def _unpickle_context(router, context_id, name):
class Poller(object): class Poller(object):
#: Increments on every poll(). Used to version _rfds and _wfds.
_generation = 1
def __init__(self): def __init__(self):
self._rfds = {} self._rfds = {}
self._wfds = {} self._wfds = {}
@property @property
def readers(self): def readers(self):
return list(self._rfds.items()) return list((fd, data) for fd, (data, gen) in self._rfds.items())
@property @property
def writers(self): def writers(self):
return list(self._wfds.items()) return list((fd, data) for fd, (data, gen) in self._wfds.items())
def __repr__(self): def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self)) return '%s(%#x)' % (type(self).__name__, id(self))
@ -1305,19 +1308,18 @@ class Poller(object):
pass pass
def start_receive(self, fd, data=None): def start_receive(self, fd, data=None):
self._rfds[fd] = data or fd self._rfds[fd] = (data or fd, self._generation)
def stop_receive(self, fd): def stop_receive(self, fd):
self._rfds.pop(fd, None) self._rfds.pop(fd, None)
def start_transmit(self, fd, data=None): def start_transmit(self, fd, data=None):
self._wfds[fd] = data or fd self._wfds[fd] = (data or fd, self._generation)
def stop_transmit(self, fd): def stop_transmit(self, fd):
self._wfds.pop(fd, None) self._wfds.pop(fd, None)
def poll(self, timeout=None): def _poll(self, timeout):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout)
(rfds, wfds, _), _ = io_op(select.select, (rfds, wfds, _), _ = io_op(select.select,
self._rfds, self._rfds,
self._wfds, self._wfds,
@ -1326,11 +1328,20 @@ class Poller(object):
for fd in rfds: for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd) _vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
yield self._rfds[fd] data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
for fd in wfds: for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
yield self._wfds[fd] data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
def poll(self, timeout=None):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout)
self._generation += 1
return self._poll(timeout)
class Latch(object): class Latch(object):

@ -48,6 +48,12 @@ class Stream(mitogen.parent.Stream):
container = None container = None
lxc_attach_path = 'lxc-attach' lxc_attach_path = 'lxc-attach'
eof_error_hint = (
'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more '
'information.'
)
def construct(self, container, lxc_attach_path=None, **kwargs): def construct(self, container, lxc_attach_path=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Stream, self).construct(**kwargs)
self.container = container self.container = container

@ -49,6 +49,12 @@ class Stream(mitogen.parent.Stream):
lxc_path = 'lxc' lxc_path = 'lxc'
python_path = 'python' python_path = 'python'
eof_error_hint = (
'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more '
'information.'
)
def construct(self, container, lxc_path=None, **kwargs): def construct(self, container, lxc_path=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Stream, self).construct(**kwargs)
self.container = container self.container = container

@ -407,6 +407,8 @@ def write_all(fd, s, deadline=None):
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Bytestring could not be written entirely before deadline was exceeded. Bytestring could not be written entirely before deadline was exceeded.
:raises mitogen.parent.EofError:
Stream indicated EOF, suggesting the child process has exitted.
:raises mitogen.core.StreamError: :raises mitogen.core.StreamError:
File descriptor was disconnected before write could complete. File descriptor was disconnected before write could complete.
""" """
@ -430,7 +432,7 @@ def write_all(fd, s, deadline=None):
for fd in poller.poll(timeout): for fd in poller.poll(timeout):
n, disconnected = mitogen.core.io_op(os.write, fd, window) n, disconnected = mitogen.core.io_op(os.write, fd, window)
if disconnected: if disconnected:
raise mitogen.core.StreamError('EOF on stream during write') raise EofError('EOF on stream during write')
written += n written += n
finally: finally:
@ -449,6 +451,8 @@ def iter_read(fds, deadline=None):
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Attempt to read beyond deadline. Attempt to read beyond deadline.
:raises mitogen.parent.EofError:
All streams indicated EOF, suggesting the child process has exitted.
:raises mitogen.core.StreamError: :raises mitogen.core.StreamError:
Attempt to read past end of file. Attempt to read past end of file.
""" """
@ -478,10 +482,9 @@ def iter_read(fds, deadline=None):
poller.close() poller.close()
if not poller.readers: if not poller.readers:
raise mitogen.core.StreamError( raise EofError(u'EOF on stream; last 300 bytes received: %r' %
u'EOF on stream; last 300 bytes received: %r' % (b('').join(bits)[-300:].decode('latin1'),))
(b('').join(bits)[-300:].decode('latin1'),)
)
raise mitogen.core.TimeoutError('read timed out') raise mitogen.core.TimeoutError('read timed out')
@ -501,6 +504,8 @@ def discard_until(fd, s, deadline):
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Attempt to read beyond deadline. Attempt to read beyond deadline.
:raises mitogen.parent.EofError:
All streams indicated EOF, suggesting the child process has exitted.
:raises mitogen.core.StreamError: :raises mitogen.core.StreamError:
Attempt to read past end of file. Attempt to read past end of file.
""" """
@ -607,6 +612,14 @@ def wstatus_to_str(status):
return 'unknown wait status (%d)' % (status,) return 'unknown wait status (%d)' % (status,)
class EofError(mitogen.core.StreamError):
"""
Raised by :func:`iter_read` and :func:`write_all` when EOF is detected by
the child process.
"""
# inherits from StreamError to maintain compatibility.
class Argv(object): class Argv(object):
""" """
Wrapper to defer argv formatting when debug logging is disabled. Wrapper to defer argv formatting when debug logging is disabled.
@ -681,14 +694,6 @@ class KqueuePoller(mitogen.core.Poller):
def close(self): def close(self):
self._kqueue.close() self._kqueue.close()
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def _control(self, fd, filters, flags): def _control(self, fd, filters, flags):
mitogen.core._vv and IOLOG.debug( mitogen.core._vv and IOLOG.debug(
'%r._control(%r, %r, %r)', self, fd, filters, flags) '%r._control(%r, %r, %r)', self, fd, filters, flags)
@ -707,7 +712,7 @@ class KqueuePoller(mitogen.core.Poller):
self, fd, data) self, fd, data)
if fd not in self._rfds: if fd not in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
self._rfds[fd] = data or fd self._rfds[fd] = (data or fd, self._generation)
def stop_receive(self, fd): def stop_receive(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
@ -720,7 +725,7 @@ class KqueuePoller(mitogen.core.Poller):
self, fd, data) self, fd, data)
if fd not in self._wfds: if fd not in self._wfds:
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
self._wfds[fd] = data or fd self._wfds[fd] = (data or fd, self._generation)
def stop_transmit(self, fd): def stop_transmit(self, fd):
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
@ -728,7 +733,7 @@ class KqueuePoller(mitogen.core.Poller):
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
del self._wfds[fd] del self._wfds[fd]
def poll(self, timeout=None): def _poll(self, timeout):
changelist = self._changelist changelist = self._changelist
self._changelist = [] self._changelist = []
events, _ = mitogen.core.io_op(self._kqueue.control, events, _ = mitogen.core.io_op(self._kqueue.control,
@ -738,13 +743,17 @@ class KqueuePoller(mitogen.core.Poller):
if event.flags & select.KQ_EV_ERROR: if event.flags & select.KQ_EV_ERROR:
LOG.debug('ignoring stale event for fd %r: errno=%d: %s', LOG.debug('ignoring stale event for fd %r: errno=%d: %s',
fd, event.data, errno.errorcode.get(event.data)) fd, event.data, errno.errorcode.get(event.data))
elif event.filter == select.KQ_FILTER_READ and fd in self._rfds: elif event.filter == select.KQ_FILTER_READ:
data, gen = self._rfds.get(fd, (None, None))
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield self._rfds[fd] yield data
elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds: elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield self._wfds[fd] yield data
class EpollPoller(mitogen.core.Poller): class EpollPoller(mitogen.core.Poller):
@ -759,14 +768,6 @@ class EpollPoller(mitogen.core.Poller):
def close(self): def close(self):
self._epoll.close() self._epoll.close()
@property
def readers(self):
return list(self._rfds.items())
@property
def writers(self):
return list(self._wfds.items())
def _control(self, fd): def _control(self, fd):
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
mask = (((fd in self._rfds) and select.EPOLLIN) | mask = (((fd in self._rfds) and select.EPOLLIN) |
@ -784,7 +785,7 @@ class EpollPoller(mitogen.core.Poller):
def start_receive(self, fd, data=None): def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)', mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
self, fd, data) self, fd, data)
self._rfds[fd] = data or fd self._rfds[fd] = (data or fd, self._generation)
self._control(fd) self._control(fd)
def stop_receive(self, fd): def stop_receive(self, fd):
@ -795,7 +796,7 @@ class EpollPoller(mitogen.core.Poller):
def start_transmit(self, fd, data=None): def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)', mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
self, fd, data) self, fd, data)
self._wfds[fd] = data or fd self._wfds[fd] = (data or fd, self._generation)
self._control(fd) self._control(fd)
def stop_transmit(self, fd): def stop_transmit(self, fd):
@ -806,20 +807,24 @@ class EpollPoller(mitogen.core.Poller):
_inmask = (getattr(select, 'EPOLLIN', 0) | _inmask = (getattr(select, 'EPOLLIN', 0) |
getattr(select, 'EPOLLHUP', 0)) getattr(select, 'EPOLLHUP', 0))
def poll(self, timeout=None): def _poll(self, timeout):
the_timeout = -1 the_timeout = -1
if timeout is not None: if timeout is not None:
the_timeout = timeout the_timeout = timeout
events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32) events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32)
for fd, event in events: for fd, event in events:
if event & self._inmask and fd in self._rfds: if event & self._inmask:
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
yield self._rfds[fd] yield data
if event & select.EPOLLOUT and fd in self._wfds: if event & select.EPOLLOUT:
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
yield self._wfds[fd] yield data
POLLER_BY_SYSNAME = { POLLER_BY_SYSNAME = {
@ -1113,6 +1118,16 @@ class Stream(mitogen.core.Stream):
msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args)) msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args))
raise mitogen.core.StreamError(msg) raise mitogen.core.StreamError(msg)
eof_error_hint = None
def _adorn_eof_error(self, e):
"""
Used by subclasses to provide additional information in the case of a
failed connection.
"""
if self.eof_error_hint:
e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)
def connect(self): def connect(self):
LOG.debug('%r.connect()', self) LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child() self.pid, fd, extra_fd = self.start_child()
@ -1124,6 +1139,10 @@ class Stream(mitogen.core.Stream):
try: try:
self._connect_bootstrap(extra_fd) self._connect_bootstrap(extra_fd)
except EofError:
e = sys.exc_info()[1]
self._adorn_eof_error(e)
raise
except Exception: except Exception:
self._reap_child() self._reap_child()
raise raise

@ -1,6 +1,7 @@
import os import os
import mitogen import mitogen
import mitogen.lxc
import unittest2 import unittest2
@ -11,19 +12,29 @@ def has_subseq(seq, subseq):
return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq))) return any(seq[x:x+len(subseq)] == subseq for x in range(0, len(seq)))
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self):
lxc_attach_path = testlib.data_path('stubs/lxc-attach.py') lxc_attach_path = testlib.data_path('stubs/lxc-attach.py')
def test_okay(self):
context = self.router.lxc( context = self.router.lxc(
container='container_name', container='container_name',
lxc_attach_path=lxc_attach_path, lxc_attach_path=self.lxc_attach_path,
) )
argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV')) argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV'))
self.assertEquals(argv[0], lxc_attach_path) self.assertEquals(argv[0], self.lxc_attach_path)
self.assertTrue('--clear-env' in argv) self.assertTrue('--clear-env' in argv)
self.assertTrue(has_subseq(argv, ['--name', 'container_name'])) self.assertTrue(has_subseq(argv, ['--name', 'container_name']))
def test_eof(self):
e = self.assertRaises(mitogen.parent.EofError,
lambda: self.router.lxc(
container='container_name',
lxc_attach_path='true',
)
)
self.assertTrue(str(e).endswith(mitogen.lxc.Stream.eof_error_hint))
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -1,13 +1,15 @@
import os import os
import mitogen import mitogen
import mitogen.lxd
import mitogen.parent
import unittest2 import unittest2
import testlib import testlib
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self): def test_okay(self):
lxc_path = testlib.data_path('stubs/lxc.py') lxc_path = testlib.data_path('stubs/lxc.py')
context = self.router.lxd( context = self.router.lxd(
@ -21,6 +23,15 @@ class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
self.assertEquals(argv[2], '--mode=noninteractive') self.assertEquals(argv[2], '--mode=noninteractive')
self.assertEquals(argv[3], 'container_name') self.assertEquals(argv[3], 'container_name')
def test_eof(self):
e = self.assertRaises(mitogen.parent.EofError,
lambda: self.router.lxd(
container='container_name',
lxc_path='true',
)
)
self.assertTrue(str(e).endswith(mitogen.lxd.Stream.eof_error_hint))
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -0,0 +1,396 @@
import errno
import os
import select
import socket
import sys
import time
import unittest2
import mitogen.core
import mitogen.parent
import testlib
class SockMixin(object):
def tearDown(self):
self.close_socks()
super(SockMixin, self).tearDown()
def setUp(self):
super(SockMixin, self).setUp()
self._setup_socks()
def _setup_socks(self):
# "left" and "right" side of two socket pairs. We use sockets instead
# of pipes since the same process can manipulate transmit/receive
# buffers on both sides (bidirectional IO), making it easier to test
# combinations of readability/writeability on the one side of a single
# file object.
self.l1_sock, self.r1_sock = socket.socketpair()
self.l1 = self.l1_sock.fileno()
self.r1 = self.r1_sock.fileno()
self.l2_sock, self.r2_sock = socket.socketpair()
self.l2 = self.l2_sock.fileno()
self.r2 = self.r2_sock.fileno()
for fd in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fd)
def fill(self, fd):
"""Make `fd` unwriteable."""
while True:
try:
os.write(fd, 'x'*4096)
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.EAGAIN:
return
raise
def drain(self, fd):
"""Make `fd` unreadable."""
while True:
try:
if not os.read(fd, 4096):
return
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.EAGAIN:
return
raise
def close_socks(self):
for sock in self.l1_sock, self.r1_sock, self.l2_sock, self.r2_sock:
sock.close()
class PollerMixin(object):
klass = None
def setUp(self):
super(PollerMixin, self).setUp()
self.p = self.klass()
def tearDown(self):
self.p.close()
super(PollerMixin, self).tearDown()
class ReceiveStateMixin(PollerMixin, SockMixin):
def test_start_receive_adds_reader(self):
self.p.start_receive(self.l1)
self.assertEquals([(self.l1, self.l1)], self.p.readers)
self.assertEquals([], self.p.writers)
def test_start_receive_adds_reader_data(self):
data = object()
self.p.start_receive(self.l1, data=data)
self.assertEquals([(self.l1, data)], self.p.readers)
self.assertEquals([], self.p.writers)
def test_stop_receive(self):
self.p.start_receive(self.l1)
self.p.stop_receive(self.l1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
def test_stop_receive_dup(self):
self.p.start_receive(self.l1)
self.p.stop_receive(self.l1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
self.p.stop_receive(self.l1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
def test_stop_receive_noexist(self):
p = self.klass()
p.stop_receive(123) # should not fail
self.assertEquals([], p.readers)
self.assertEquals([], self.p.writers)
class TransmitStateMixin(PollerMixin, SockMixin):
def test_start_transmit_adds_writer(self):
self.p.start_transmit(self.r1)
self.assertEquals([], self.p.readers)
self.assertEquals([(self.r1, self.r1)], self.p.writers)
def test_start_transmit_adds_writer_data(self):
data = object()
self.p.start_transmit(self.r1, data=data)
self.assertEquals([], self.p.readers)
self.assertEquals([(self.r1, data)], self.p.writers)
def test_stop_transmit(self):
self.p.start_transmit(self.r1)
self.p.stop_transmit(self.r1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
def test_stop_transmit_dup(self):
self.p.start_transmit(self.r1)
self.p.stop_transmit(self.r1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
self.p.stop_transmit(self.r1)
self.assertEquals([], self.p.readers)
self.assertEquals([], self.p.writers)
def test_stop_transmit_noexist(self):
p = self.klass()
p.stop_receive(123) # should not fail
self.assertEquals([], p.readers)
self.assertEquals([], self.p.writers)
class CloseMixin(PollerMixin):
def test_single_close(self):
self.p.close()
def test_double_close(self):
self.p.close()
self.p.close()
class PollMixin(PollerMixin):
def test_empty_zero_timeout(self):
t0 = time.time()
self.assertEquals([], list(self.p.poll(0)))
self.assertTrue((time.time() - t0) < .1) # vaguely reasonable
def test_empty_small_timeout(self):
t0 = time.time()
self.assertEquals([], list(self.p.poll(.2)))
self.assertTrue((time.time() - t0) >= .2)
class ReadableMixin(PollerMixin, SockMixin):
def test_unreadable(self):
self.p.start_receive(self.l1)
self.assertEquals([], list(self.p.poll(0)))
def test_readable_before_add(self):
self.fill(self.r1)
self.p.start_receive(self.l1)
self.assertEquals([self.l1], list(self.p.poll(0)))
def test_readable_after_add(self):
self.p.start_receive(self.l1)
self.fill(self.r1)
self.assertEquals([self.l1], list(self.p.poll(0)))
def test_readable_then_unreadable(self):
self.fill(self.r1)
self.p.start_receive(self.l1)
self.assertEquals([self.l1], list(self.p.poll(0)))
self.drain(self.l1)
self.assertEquals([], list(self.p.poll(0)))
def test_readable_data(self):
data = object()
self.fill(self.r1)
self.p.start_receive(self.l1, data=data)
self.assertEquals([data], list(self.p.poll(0)))
def test_double_readable_data(self):
data1 = object()
data2 = object()
self.fill(self.r1)
self.p.start_receive(self.l1, data=data1)
self.fill(self.r2)
self.p.start_receive(self.l2, data=data2)
self.assertEquals(set([data1, data2]), set(self.p.poll(0)))
class WriteableMixin(PollerMixin, SockMixin):
def test_writeable(self):
self.p.start_transmit(self.r1)
self.assertEquals([self.r1], list(self.p.poll(0)))
def test_writeable_data(self):
data = object()
self.p.start_transmit(self.r1, data=data)
self.assertEquals([data], list(self.p.poll(0)))
def test_unwriteable_before_add(self):
self.fill(self.r1)
self.p.start_transmit(self.r1)
self.assertEquals([], list(self.p.poll(0)))
def test_unwriteable_after_add(self):
self.p.start_transmit(self.r1)
self.fill(self.r1)
self.assertEquals([], list(self.p.poll(0)))
def test_unwriteable_then_writeable(self):
self.fill(self.r1)
self.p.start_transmit(self.r1)
self.assertEquals([], list(self.p.poll(0)))
self.drain(self.l1)
self.assertEquals([self.r1], list(self.p.poll(0)))
def test_double_unwriteable_then_Writeable(self):
self.fill(self.r1)
self.p.start_transmit(self.r1)
self.fill(self.r2)
self.p.start_transmit(self.r2)
self.assertEquals([], list(self.p.poll(0)))
self.drain(self.l1)
self.assertEquals([self.r1], list(self.p.poll(0)))
self.drain(self.l2)
self.assertEquals(set([self.r1, self.r2]), set(self.p.poll(0)))
class MutateDuringYieldMixin(PollerMixin, SockMixin):
# verify behaviour when poller contents is modified in the middle of
# poll() output generation.
def test_one_readable_removed_before_yield(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
p = self.p.poll(0)
self.p.stop_receive(self.r1)
self.assertEquals([], list(p))
def test_one_writeable_removed_before_yield(self):
self.p.start_transmit(self.r1)
p = self.p.poll(0)
self.p.stop_transmit(self.r1)
self.assertEquals([], list(p))
def test_one_readable_readded_before_yield(self):
# fd removed, closed, another fd opened, gets same fd number, re-added.
# event fires for wrong underlying object.
self.fill(self.l1)
self.p.start_receive(self.r1)
p = self.p.poll(0)
self.p.stop_receive(self.r1)
self.p.start_receive(self.r1)
self.assertEquals([], list(p))
def test_one_readable_readded_during_yield(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
self.fill(self.l2)
self.p.start_receive(self.r2)
p = self.p.poll(0)
# figure out which one is consumed and which is still to-read.
consumed = next(p)
ready = (self.r1, self.r2)[consumed == self.r1]
# now remove and re-add the one that hasn't been read yet.
self.p.stop_receive(ready)
self.p.start_receive(ready)
# the start_receive() may be for a totally new underlying file object,
# the live loop iteration must not yield any buffered readiness event.
self.assertEquals([], list(p))
class FileClosedMixin(PollerMixin, SockMixin):
# Verify behaviour when a registered file object is closed in various
# scenarios, without first calling stop_receive()/stop_transmit().
def test_writeable_then_closed(self):
self.p.start_transmit(self.r1)
self.assertEquals([self.r1], list(self.p.poll(0)))
self.close_socks()
try:
self.assertEquals([], list(self.p.poll(0)))
except select.error:
# a crash is also reasonable here.
pass
def test_writeable_closed_before_yield(self):
self.p.start_transmit(self.r1)
p = self.p.poll(0)
self.close_socks()
try:
self.assertEquals([], list(p))
except select.error:
# a crash is also reasonable here.
pass
def test_readable_then_closed(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
self.assertEquals([self.r1], list(self.p.poll(0)))
self.close_socks()
try:
self.assertEquals([], list(self.p.poll(0)))
except select.error:
# a crash is also reasonable here.
pass
def test_readable_closed_before_yield(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
p = self.p.poll(0)
self.close_socks()
try:
self.assertEquals([], list(p))
except select.error:
# a crash is also reasonable here.
pass
class DistinctDataMixin(PollerMixin, SockMixin):
# Verify different data is yielded for the same FD according to the event
# being raised.
def test_one_distinct(self):
rdata = object()
wdata = object()
self.p.start_receive(self.r1, data=rdata)
self.p.start_transmit(self.r1, data=wdata)
self.assertEquals([wdata], list(self.p.poll(0)))
self.fill(self.l1) # r1 is now readable and writeable.
self.assertEquals(set([rdata, wdata]), set(self.p.poll(0)))
class AllMixin(ReceiveStateMixin,
TransmitStateMixin,
ReadableMixin,
WriteableMixin,
MutateDuringYieldMixin,
FileClosedMixin,
DistinctDataMixin,
PollMixin,
CloseMixin):
"""
Helper to avoid cutpasting mixin names below.
"""
@unittest2.skipIf(condition=not hasattr(select, 'select'),
reason='select.select() not supported')
class SelectTest(AllMixin, testlib.TestCase):
klass = mitogen.core.Poller
@unittest2.skipIf(condition=not hasattr(select, 'kqueue'),
reason='select.kqueue() not supported')
class KqueueTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.KqueuePoller
@unittest2.skipIf(condition=not hasattr(select, 'epoll'),
reason='select.epoll() not supported')
class EpollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.EpollPoller
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save