issue #249: Poller API v2 (BSD only).

Now it's BasicStream/Side-agnostic, so it can be reused for Latch and
iter_read().
pull/255/head
David Wilson 7 years ago
parent 11c2e4ab3e
commit 9abcf63155

@ -762,6 +762,7 @@ class Side(object):
def __init__(self, stream, fd, cloexec=True, keep_alive=True): def __init__(self, stream, fd, cloexec=True, keep_alive=True):
self.stream = stream self.stream = stream
self.fd = fd self.fd = fd
self.closed = False
self.keep_alive = keep_alive self.keep_alive = keep_alive
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
@ -771,21 +772,16 @@ class Side(object):
def __repr__(self): def __repr__(self):
return '<Side of %r fd %s>' % (self.stream, self.fd) return '<Side of %r fd %s>' % (self.stream, self.fd)
def fileno(self):
if self.fd is None:
raise StreamError('%r.fileno() called but no FD set', self)
return self.fd
@classmethod @classmethod
def _on_fork(cls): def _on_fork(cls):
for side in list(cls._fork_refs.values()): for side in list(cls._fork_refs.values()):
side.close() side.close()
def close(self): def close(self):
if self.fd is not None: if not self.closed:
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
os.close(self.fd) os.close(self.fd)
self.fd = None self.closed = True
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
s, disconnected = io_op(os.read, self.fd, n) s, disconnected = io_op(os.read, self.fd, n)
@ -1442,66 +1438,51 @@ class Router(object):
class Poller(object): class Poller(object):
def __init__(self): def __init__(self):
self.readers = [] self._rfds = {}
self.writers = [] self._wfds = {}
_repr = 'Poller()' _repr = 'Poller()'
def __repr__(self): @property
return self._repr def readers(self):
return list(self._rfds.items())
def _list_discard(self, lst, value):
try:
lst.remove(value)
except ValueError:
pass
def _list_add(self, lst, value): @property
if value not in lst: def writers(self):
lst.append(value) return list(self._wfds.items())
def start_receive(self, stream): def __repr__(self):
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) return self._repr
assert stream.receive_side and stream.receive_side.fd is not None
self._list_add(self.readers, stream.receive_side)
def stop_receive(self, stream): def start_receive(self, fd, data=None):
_vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) self._rfds[fd] = data or fd
self._list_discard(self.readers, stream.receive_side)
def start_transmit(self, stream): def stop_receive(self, fd):
_vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) self._rfds.pop(fd, None)
assert stream.transmit_side and stream.transmit_side.fd is not None
self._list_add(self.writers, stream.transmit_side)
def stop_transmit(self, stream): def start_transmit(self, fd, data=None):
_vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) self._wfds[fd] = data or fd
self._list_discard(self.writers, stream.transmit_side)
def _call(self, broker, stream, func): def stop_transmit(self, fd):
try: self._wfds.pop(fd, None)
func(broker)
except Exception:
LOG.exception('%r crashed', stream)
stream.on_disconnect(self)
def poll(self, broker, timeout=None): def poll(self, timeout=None):
_vv and IOLOG.debug('%r.poll(%r)', self, timeout) _vv and IOLOG.debug('%r.poll(%r)', self, timeout)
#IOLOG.debug('readers = %r', self.readers) IOLOG.debug('readers = %r', self._rfds)
#IOLOG.debug('writers = %r', self.writers) IOLOG.debug('writers = %r', self._wfds)
(rsides, wsides, _), _ = io_op(select.select, (rfds, wfds, _), _ = io_op(select.select,
self.readers, self._rfds,
self.writers, self._wfds,
(), timeout (), timeout
) )
for side in rsides: for fd in rfds:
_vv and IOLOG.debug('%r: POLLIN for %r', self, side) _vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
self._call(broker, side.stream, side.stream.on_receive) yield self._rfds[fd]
for side in wsides: for fd in wfds:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, side) _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
self._call(broker, side.stream, side.stream.on_transmit) yield self._wfds[fd]
class Broker(object): class Broker(object):
@ -1515,9 +1496,10 @@ class Broker(object):
self._waker = Waker(self) self._waker = Waker(self)
self.defer = self._waker.defer self.defer = self._waker.defer
self.poller = self.poller_class() self.poller = self.poller_class()
self.poller.start_receive(self._waker) self.poller.start_receive(
self.readers = [self._waker.receive_side] self._waker.receive_side.fd,
self.writers = [] (self._waker.receive_side, self._waker.on_receive)
)
self._thread = threading.Thread( self._thread = threading.Thread(
target=_profile_hook, target=_profile_hook,
args=('broker', self._broker_main), args=('broker', self._broker_main),
@ -1527,33 +1509,54 @@ class Broker(object):
self._waker.broker_ident = self._thread.ident self._waker.broker_ident = self._thread.ident
def start_receive(self, stream): def start_receive(self, stream):
self.defer(self.poller.start_receive, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side
assert side and side.fd is not None
self.defer(self.poller.start_receive,
side.fd, (side, stream.on_receive))
def stop_receive(self, stream): def stop_receive(self, stream):
self.defer(self.poller.stop_receive, stream) _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self.poller.stop_receive, stream.receive_side.fd)
def _start_transmit(self, stream): def _start_transmit(self, stream):
self.poller.start_transmit(stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
side = stream.transmit_side
assert side and side.fd is not None
self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream): def _stop_transmit(self, stream):
self.poller.stop_transmit(stream) _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self.poller.stop_transmit(stream.transmit_side.fd)
def keep_alive(self): def keep_alive(self):
return sum((side.keep_alive for side in self.poller.readers), 0) it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0)
def _call(self, stream, func):
try:
func(self)
except Exception:
LOG.exception('%r crashed', stream)
stream.on_disconnect(self)
def _loop_once(self, timeout=None):
_vv and IOLOG.debug('%r._loop_once(%r)', self, timeout)
for (side, func) in self.poller.poll(timeout):
self._call(side.stream, func)
def _broker_main(self): def _broker_main(self):
try: try:
while self._alive: while self._alive:
self.poller.poll(self) self._loop_once()
fire(self, 'shutdown') fire(self, 'shutdown')
for _, (side, _) in self.poller.readers + self.poller.writers:
for side in set(self.poller.readers).union(self.poller.writers): self._call(side.stream, side.stream.on_shutdown)
self.poller._call(self, side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout deadline = time.time() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline: while self.keep_alive() and time.time() < deadline:
self.poller.poll(self, max(0, deadline - time.time())) self._loop_once(max(0, deadline - time.time()))
if self.keep_alive(): if self.keep_alive():
LOG.error('%r: some streams did not close gracefully. ' LOG.error('%r: some streams did not close gracefully. '
@ -1561,7 +1564,7 @@ class Broker(object):
'more child processes still connected to ' 'more child processes still connected to '
'our stdout/stderr pipes.', self) 'our stdout/stderr pipes.', self)
for side in set(self.readers).union(self.writers): for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self) side.stream.on_disconnect(self)
except Exception: except Exception:

@ -513,68 +513,61 @@ class KqueuePoller(Poller):
def __init__(self): def __init__(self):
self._kqueue = select.kqueue() self._kqueue = select.kqueue()
self._reader_by_fd = {} self._rfds = {}
self._writer_by_fd = {} self._wfds = {}
self._changelist = [] self._changelist = []
@property @property
def readers(self): def readers(self):
return list(self._reader_by_fd.values()) return list(self._rfds.items())
@property @property
def writers(self): def writers(self):
return list(self._writer_by_fd.values()) return list(self._wfds.items())
def _control(self, side, filters, flags): def _control(self, fd, filters, flags):
mitogen.core._vv and IOLOG.debug( mitogen.core._vv and IOLOG.debug(
'%r._control(%r, %r, %r)', self, side, filters, flags) '%r._control(%r, %r, %r)', self, fd, filters, flags)
self._changelist.append(select.kevent(side.fd, filters, flags)) self._changelist.append(select.kevent(fd, filters, flags))
def start_receive(self, stream): def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %d)',
side = stream.receive_side self, fd, data)
assert side and side.fd is not None if fd not in self._rfds:
if side.fd not in self._reader_by_fd: self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
self._reader_by_fd[side.fd] = side self._rfds[fd] = data or fd
self._control(side, select.KQ_FILTER_READ, select.KQ_EV_ADD)
def stop_receive(self, fd):
def stop_receive(self, stream): mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd, data)
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) if fd in self._rfds:
side = stream.receive_side self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
if side.fd in self._reader_by_fd: del self._rfds[fd]
del self._reader_by_fd[side.fd]
self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE) def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, fd, data)
def start_transmit(self, stream): if fd not in self._wfds:
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
side = stream.transmit_side self._wfds[fd] = data or fd
assert side and side.fd is not None
if side.fd not in self._writer_by_fd: def stop_transmit(self, fd):
self._writer_by_fd[side.fd] = side mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) if fd in self._wfds:
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
def stop_transmit(self, stream): del self._wfds[fd]
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream)
side = stream.transmit_side
if side.fd in self._writer_by_fd:
del self._writer_by_fd[side.fd]
self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
def poll(self, broker, timeout=None): def poll(self, broker, timeout=None):
changelist = self._changelist changelist = self._changelist
self._changelist = [] self._changelist = []
for event in self._kqueue.control(changelist, 32, timeout): for event in self._kqueue.control(changelist, 32, timeout):
if event.filter == select.KQ_FILTER_READ: if event.filter == select.KQ_FILTER_READ:
side = self._reader_by_fd.get(event.ident) if event.ident in self._rfds:
# Events can still be read for an already-discarded fd. # Events can still be read for an already-discarded fd.
if side:
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side)
self._call(broker, side.stream, side.stream.on_receive) yield self._rfds[event.ident]
elif event.filter == select.KQ_FILTER_WRITE: elif event.filter == select.KQ_FILTER_WRITE:
side = self._writer_by_fd.get(event.ident) if event.ident in self._wfds:
if side:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side)
self._call(broker, side.stream, side.stream.on_transmit) yield self._wfds[event.ident]
class EpollPoller(Poller): class EpollPoller(Poller):
@ -660,7 +653,7 @@ class EpollPoller(Poller):
POLLER_BY_SYSNAME = { POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller, 'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller, 'FreeBSD': KqueuePoller,
'Linux': EpollPoller, #'Linux': EpollPoller,
} }
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller)
@ -760,7 +753,7 @@ class Stream(mitogen.core.Stream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down.""" """Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self) LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send( self._send(
mitogen.core.Message( mitogen.core.Message(
src_id=mitogen.context_id, src_id=mitogen.context_id,
dst_id=self.remote_id, dst_id=self.remote_id,

Loading…
Cancel
Save