From 7320c542df0467e70136495926de639d86b43884 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 14 May 2018 17:52:08 +0000 Subject: [PATCH] issue #249: EpollPoller() for Linux. --- mitogen/parent.py | 83 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/mitogen/parent.py b/mitogen/parent.py index 835c7e79..d9b67ede 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -542,7 +542,7 @@ class KqueuePoller(Poller): mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) side = stream.receive_side if side.fd in self._reader_by_fd: - del self._reader_by_fd[stream.receive_side.fd] + del self._reader_by_fd[side.fd] self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE) def start_transmit(self, stream): @@ -577,11 +577,90 @@ class KqueuePoller(Poller): self._call(broker, side.stream, side.stream.on_transmit) +class EpollPoller(Poller): + _repr = 'EpollPoller()' + + def __init__(self): + self._epoll = select.epoll() + self._registered_fds = set() + self._reader_by_fd = {} + self._writer_by_fd = {} + + @property + def readers(self): + return list(self._reader_by_fd.values()) + + @property + def writers(self): + return list(self._writer_by_fd.values()) + + def _control(self, fd): + mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) + mask = (((fd in self._reader_by_fd) and select.EPOLLIN) | + ((fd in self._writer_by_fd) and select.EPOLLOUT)) + if mask: + if fd in self._registered_fds: + self._epoll.modify(fd, mask) + else: + self._epoll.register(fd, mask) + self._registered_fds.add(fd) + elif fd in self._registered_fds: + self._epoll.unregister(fd) + self._registered_fds.remove(fd) + + def start_receive(self, stream): + mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) + side = stream.receive_side + assert side and side.fd is not None + if side.fd not in self._reader_by_fd: + self._reader_by_fd[side.fd] = side + self._control(side.fd) + + def stop_receive(self, stream): + mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) + side = stream.receive_side + if side.fd in self._reader_by_fd: + del self._reader_by_fd[side.fd] + self._control(side.fd) + + def start_transmit(self, stream): + mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) + side = stream.transmit_side + assert side and side.fd is not None + if side.fd not in self._writer_by_fd: + self._writer_by_fd[side.fd] = side + self._control(side.fd) + + def stop_transmit(self, stream): + mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream) + side = stream.transmit_side + if side.fd in self._writer_by_fd: + del self._writer_by_fd[side.fd] + self._control(side.fd) + + def poll(self, broker, timeout=None): + the_timeout = -1 + if timeout is not None: + the_timeout = timeout + + for fd, ev in self._epoll.poll(the_timeout): + if ev & select.EPOLLIN: + side = self._reader_by_fd.get(fd) + # Events can still be read for an already-discarded fd. + if side: + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) + self._call(broker, side.stream, side.stream.on_receive) + elif ev & select.EPOLLOUT: + side = self._writer_by_fd.get(fd) + if side: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) + self._call(broker, side.stream, side.stream.on_transmit) + POLLER_BY_SYSNAME = { 'Darwin': KqueuePoller, 'FreeBSD': KqueuePoller, - #'Linux': EpollPoller, + 'Linux': EpollPoller, } PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller)