issue #542: return of select poller, new selection logic

pull/564/head
David Wilson 5 years ago
parent 0aa4c9d8fc
commit 9bcd2ec56c

@ -1912,6 +1912,8 @@ class Poller(object):
Pollers may only be used by one thread at a time.
"""
SUPPORTED = True
# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
@ -1928,11 +1930,16 @@ class Poller(object):
def __init__(self):
self._rfds = {}
self._wfds = {}
self._pollobj = select.poll()
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
def _update(self, fd):
"""
Required by PollPoller subclass.
"""
pass
@property
def readers(self):
"""
@ -1955,20 +1962,6 @@ class Poller(object):
"""
pass
_readmask = select.POLLIN | select.POLLHUP
# TODO: no proof we dont need writemask too
def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def start_receive(self, fd, data=None):
"""
Cause :meth:`poll` to yield `data` when `fd` is readable.
@ -2004,22 +1997,27 @@ class Poller(object):
self._update(fd)
def _poll(self, timeout):
(rfds, wfds, _), _ = io_op(select.select,
self._rfds,
self._wfds,
(), timeout
)
for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if timeout:
timeout *= 1000
events, _ = io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
_vv and IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
def poll(self, timeout=None):
"""
Block the calling thread until one or more FDs are ready for IO.

@ -890,10 +890,58 @@ class CallSpec(object):
)
class PollPoller(mitogen.core.Poller):
"""
Poller based on the POSIX poll(2) interface. Not available on some versions
of OS X, otherwise it is the preferred poller for small FD counts.
"""
SUPPORTED = hasattr(select, 'poll')
_repr = 'PollPoller()'
def __init__(self):
super(PollPoller, self).__init__()
self._pollobj = select.poll()
# TODO: no proof we dont need writemask too
_readmask = (
getattr(select, 'POLLIN', 0) |
getattr(select, 'POLLHUP', 0)
)
def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def _poll(self, timeout):
if timeout:
timeout *= 1000
events, _ = mitogen.core.io_op(self._pollobj.poll, timeout)
for fd, event in events:
if event & self._readmask:
IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
data, gen = self._rfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
if event & select.POLLOUT:
IOLOG.debug('%r: POLLOUT for %r', self, fd)
data, gen = self._wfds.get(fd, (None, None))
if gen and gen < self._generation:
yield data
class KqueuePoller(mitogen.core.Poller):
"""
Poller based on the FreeBSD/Darwin kqueue(2) interface.
"""
SUPPORTED = hasattr(select, 'kqueue')
_repr = 'KqueuePoller()'
def __init__(self):
@ -971,6 +1019,7 @@ class EpollPoller(mitogen.core.Poller):
"""
Poller based on the Linux epoll(2) interface.
"""
SUPPORTED = hasattr(select, 'epoll')
_repr = 'EpollPoller()'
def __init__(self):
@ -1041,20 +1090,18 @@ class EpollPoller(mitogen.core.Poller):
yield data
if sys.version_info < (2, 6):
# 2.4 and 2.5 only had select.select() and select.poll().
POLLER_BY_SYSNAME = {}
else:
POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller,
'Linux': EpollPoller,
}
# 2.4 and 2.5 only had select.select() and select.poll().
for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller:
if _klass.SUPPORTED:
PREFERRED_POLLER = _klass
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
if PollPoller.SUPPORTED:
mitogen.core.Latch.poller_class = PollPoller
else:
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class DiagLogStream(mitogen.core.BasicStream):

@ -401,16 +401,25 @@ class SelectTest(AllMixin, testlib.TestCase):
klass = mitogen.core.Poller
SelectTest = unittest2.skipIf(
condition=not hasattr(select, 'select'),
condition=(not SelectTest.klass.SUPPORTED),
reason='select.select() not supported'
)(SelectTest)
class PollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.PollPoller
PollTest = unittest2.skipIf(
condition=(not PollTest.klass.SUPPORTED),
reason='select.poll() not supported'
)(PollTest)
class KqueueTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.KqueuePoller
KqueueTest = unittest2.skipIf(
condition=not hasattr(select, 'kqueue'),
condition=(not KqueueTest.klass.SUPPORTED),
reason='select.kqueue() not supported'
)(KqueueTest)
@ -419,7 +428,7 @@ class EpollTest(AllMixin, testlib.TestCase):
klass = mitogen.parent.EpollPoller
EpollTest = unittest2.skipIf(
condition=not hasattr(select, 'epoll'),
condition=(not EpollTest.klass.SUPPORTED),
reason='select.epoll() not supported'
)(EpollTest)

Loading…
Cancel
Save