diff --git a/mitogen/core.py b/mitogen/core.py index 64363951..b952e601 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -762,6 +762,7 @@ class Side(object): def __init__(self, stream, fd, cloexec=True, keep_alive=True): self.stream = stream self.fd = fd + self.closed = False self.keep_alive = keep_alive self._fork_refs[id(self)] = self if cloexec: @@ -771,21 +772,16 @@ class Side(object): def __repr__(self): return '' % (self.stream, self.fd) - def fileno(self): - if self.fd is None: - raise StreamError('%r.fileno() called but no FD set', self) - return self.fd - @classmethod def _on_fork(cls): for side in list(cls._fork_refs.values()): side.close() def close(self): - if self.fd is not None: + if not self.closed: _vv and IOLOG.debug('%r.close()', self) os.close(self.fd) - self.fd = None + self.closed = True def read(self, n=CHUNK_SIZE): s, disconnected = io_op(os.read, self.fd, n) @@ -1442,66 +1438,51 @@ class Router(object): class Poller(object): def __init__(self): - self.readers = [] - self.writers = [] + self._rfds = {} + self._wfds = {} _repr = 'Poller()' - def __repr__(self): - return self._repr - - def _list_discard(self, lst, value): - try: - lst.remove(value) - except ValueError: - pass + @property + def readers(self): + return list(self._rfds.items()) - def _list_add(self, lst, value): - if value not in lst: - lst.append(value) + @property + def writers(self): + return list(self._wfds.items()) - def start_receive(self, stream): - _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) - assert stream.receive_side and stream.receive_side.fd is not None - self._list_add(self.readers, stream.receive_side) + def __repr__(self): + return self._repr - def stop_receive(self, stream): - _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) - self._list_discard(self.readers, stream.receive_side) + def start_receive(self, fd, data=None): + self._rfds[fd] = data or fd - def start_transmit(self, stream): - _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) - assert stream.transmit_side and stream.transmit_side.fd is not None - self._list_add(self.writers, stream.transmit_side) + def stop_receive(self, fd): + self._rfds.pop(fd, None) - def stop_transmit(self, stream): - _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) - self._list_discard(self.writers, stream.transmit_side) + def start_transmit(self, fd, data=None): + self._wfds[fd] = data or fd - def _call(self, broker, stream, func): - try: - func(broker) - except Exception: - LOG.exception('%r crashed', stream) - stream.on_disconnect(self) + def stop_transmit(self, fd): + self._wfds.pop(fd, None) - def poll(self, broker, timeout=None): + def poll(self, timeout=None): _vv and IOLOG.debug('%r.poll(%r)', self, timeout) - #IOLOG.debug('readers = %r', self.readers) - #IOLOG.debug('writers = %r', self.writers) - (rsides, wsides, _), _ = io_op(select.select, - self.readers, - self.writers, + IOLOG.debug('readers = %r', self._rfds) + IOLOG.debug('writers = %r', self._wfds) + (rfds, wfds, _), _ = io_op(select.select, + self._rfds, + self._wfds, (), timeout ) - for side in rsides: - _vv and IOLOG.debug('%r: POLLIN for %r', self, side) - self._call(broker, side.stream, side.stream.on_receive) + for fd in rfds: + _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) + yield self._rfds[fd] - for side in wsides: - _vv and IOLOG.debug('%r: POLLOUT for %r', self, side) - self._call(broker, side.stream, side.stream.on_transmit) + for fd in wfds: + _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) + yield self._wfds[fd] class Broker(object): @@ -1515,9 +1496,10 @@ class Broker(object): self._waker = Waker(self) self.defer = self._waker.defer self.poller = self.poller_class() - self.poller.start_receive(self._waker) - self.readers = [self._waker.receive_side] - self.writers = [] + self.poller.start_receive( + self._waker.receive_side.fd, + (self._waker.receive_side, self._waker.on_receive) + ) self._thread = threading.Thread( target=_profile_hook, args=('broker', self._broker_main), @@ -1527,33 +1509,54 @@ class Broker(object): self._waker.broker_ident = self._thread.ident def start_receive(self, stream): - self.defer(self.poller.start_receive, stream) + _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) + side = stream.receive_side + assert side and side.fd is not None + self.defer(self.poller.start_receive, + side.fd, (side, stream.on_receive)) def stop_receive(self, stream): - self.defer(self.poller.stop_receive, stream) + _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) + self.defer(self.poller.stop_receive, stream.receive_side.fd) def _start_transmit(self, stream): - self.poller.start_transmit(stream) + _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) + side = stream.transmit_side + assert side and side.fd is not None + self.poller.start_transmit(side.fd, (side, stream.on_transmit)) def _stop_transmit(self, stream): - self.poller.stop_transmit(stream) + _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) + self.poller.stop_transmit(stream.transmit_side.fd) def keep_alive(self): - return sum((side.keep_alive for side in self.poller.readers), 0) + it = (side.keep_alive for (_, (side, _)) in self.poller.readers) + return sum(it, 0) + + def _call(self, stream, func): + try: + func(self) + except Exception: + LOG.exception('%r crashed', stream) + stream.on_disconnect(self) + + def _loop_once(self, timeout=None): + _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) + for (side, func) in self.poller.poll(timeout): + self._call(side.stream, func) def _broker_main(self): try: while self._alive: - self.poller.poll(self) + self._loop_once() fire(self, 'shutdown') - - for side in set(self.poller.readers).union(self.poller.writers): - self.poller._call(self, side.stream, side.stream.on_shutdown) + for _, (side, _) in self.poller.readers + self.poller.writers: + self._call(side.stream, side.stream.on_shutdown) deadline = time.time() + self.shutdown_timeout while self.keep_alive() and time.time() < deadline: - self.poller.poll(self, max(0, deadline - time.time())) + self._loop_once(max(0, deadline - time.time())) if self.keep_alive(): LOG.error('%r: some streams did not close gracefully. ' @@ -1561,7 +1564,7 @@ class Broker(object): 'more child processes still connected to ' 'our stdout/stderr pipes.', self) - for side in set(self.readers).union(self.writers): + for _, (side, _) in self.poller.readers + self.poller.writers: LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) except Exception: diff --git a/mitogen/parent.py b/mitogen/parent.py index d9b67ede..89a7cf8b 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -513,68 +513,61 @@ class KqueuePoller(Poller): def __init__(self): self._kqueue = select.kqueue() - self._reader_by_fd = {} - self._writer_by_fd = {} + self._rfds = {} + self._wfds = {} self._changelist = [] @property def readers(self): - return list(self._reader_by_fd.values()) + return list(self._rfds.items()) @property def writers(self): - return list(self._writer_by_fd.values()) + return list(self._wfds.items()) - def _control(self, side, filters, flags): + def _control(self, fd, filters, flags): mitogen.core._vv and IOLOG.debug( - '%r._control(%r, %r, %r)', self, side, filters, flags) - self._changelist.append(select.kevent(side.fd, filters, flags)) - - def start_receive(self, stream): - mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) - side = stream.receive_side - assert side and side.fd is not None - if side.fd not in self._reader_by_fd: - self._reader_by_fd[side.fd] = side - self._control(side, select.KQ_FILTER_READ, select.KQ_EV_ADD) - - def stop_receive(self, stream): - mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) - side = stream.receive_side - if side.fd in self._reader_by_fd: - del self._reader_by_fd[side.fd] - self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE) - - def start_transmit(self, stream): - mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) - side = stream.transmit_side - assert side and side.fd is not None - if side.fd not in self._writer_by_fd: - self._writer_by_fd[side.fd] = side - self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) - - def stop_transmit(self, stream): - mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream) - side = stream.transmit_side - if side.fd in self._writer_by_fd: - del self._writer_by_fd[side.fd] - self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) + '%r._control(%r, %r, %r)', self, fd, filters, flags) + self._changelist.append(select.kevent(fd, filters, flags)) + + def start_receive(self, fd, data=None): + mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %d)', + self, fd, data) + if fd not in self._rfds: + self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) + self._rfds[fd] = data or fd + + def stop_receive(self, fd): + mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd, data) + if fd in self._rfds: + self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) + del self._rfds[fd] + + def start_transmit(self, fd, data=None): + mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, fd, data) + if fd not in self._wfds: + self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) + self._wfds[fd] = data or fd + + def stop_transmit(self, fd): + mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) + if fd in self._wfds: + self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) + del self._wfds[fd] def poll(self, broker, timeout=None): changelist = self._changelist self._changelist = [] for event in self._kqueue.control(changelist, 32, timeout): if event.filter == select.KQ_FILTER_READ: - side = self._reader_by_fd.get(event.ident) - # Events can still be read for an already-discarded fd. - if side: + if event.ident in self._rfds: + # Events can still be read for an already-discarded fd. mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) - self._call(broker, side.stream, side.stream.on_receive) + yield self._rfds[event.ident] elif event.filter == select.KQ_FILTER_WRITE: - side = self._writer_by_fd.get(event.ident) - if side: + if event.ident in self._wfds: mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) - self._call(broker, side.stream, side.stream.on_transmit) + yield self._wfds[event.ident] class EpollPoller(Poller): @@ -660,7 +653,7 @@ class EpollPoller(Poller): POLLER_BY_SYSNAME = { 'Darwin': KqueuePoller, 'FreeBSD': KqueuePoller, - 'Linux': EpollPoller, + #'Linux': EpollPoller, } PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) @@ -760,7 +753,7 @@ class Stream(mitogen.core.Stream): def on_shutdown(self, broker): """Request the slave gracefully shut itself down.""" LOG.debug('%r closing CALL_FUNCTION channel', self) - self.send( + self._send( mitogen.core.Message( src_id=mitogen.context_id, dst_id=self.remote_id,