From bc7be1879d3697ec450c5619df523655ac025bbb Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 14 May 2018 18:28:05 +0100 Subject: [PATCH] issue #249: initial poller implementation (BSD only) --- docs/internals.rst | 13 +++++ mitogen/core.py | 111 ++++++++++++++++++++++++---------------- mitogen/master.py | 1 + mitogen/parent.py | 123 ++++++++++++++++++++++++++++++++++++++------- 4 files changed, 188 insertions(+), 60 deletions(-) diff --git a/docs/internals.rst b/docs/internals.rst index f3771343..3aa1fac4 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -261,6 +261,19 @@ Other Stream Subclasses :members: +Poller Class +------------ + +.. currentmodule:: mitogen.core +.. autoclass:: Poller + +.. currentmodule:: mitogen.parent +.. autoclass:: KqueuePoller + +.. currentmodule:: mitogen.parent +.. autoclass:: EpollPoller + + Importer Class -------------- diff --git a/mitogen/core.py b/mitogen/core.py index e947ecfe..cfea37d6 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1437,24 +1437,15 @@ class Router(object): self.broker.defer(self._async_route, msg) -class Broker(object): - _waker = None - _thread = None - shutdown_timeout = 3.0 - +class Poller(object): def __init__(self): - self._alive = True - self._waker = Waker(self) - self.defer = self._waker.defer - self._readers = [self._waker.receive_side] - self._writers = [] - self._thread = threading.Thread( - target=_profile_hook, - args=('broker', self._broker_main), - name='mitogen-broker' - ) - self._thread.start() - self._waker.broker_ident = self._thread.ident + self.readers = [] + self.writers = [] + + _repr = 'Poller()' + + def __repr__(self): + return self._repr def _list_discard(self, lst, value): try: @@ -1469,63 +1460,97 @@ class Broker(object): def start_receive(self, stream): _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) assert stream.receive_side and stream.receive_side.fd is not None - self.defer(self._list_add, self._readers, stream.receive_side) + self._list_add(self.readers, stream.receive_side) def stop_receive(self, stream): - IOLOG.debug('%r.stop_receive(%r)', self, stream) - self.defer(self._list_discard, self._readers, stream.receive_side) + _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) + self._list_discard(self.readers, stream.receive_side) - def _start_transmit(self, stream): - IOLOG.debug('%r._start_transmit(%r)', self, stream) + def start_transmit(self, stream): + _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) assert stream.transmit_side and stream.transmit_side.fd is not None - self._list_add(self._writers, stream.transmit_side) + self._list_add(self.writers, stream.transmit_side) - def _stop_transmit(self, stream): - IOLOG.debug('%r._stop_transmit(%r)', self, stream) - self._list_discard(self._writers, stream.transmit_side) + def stop_transmit(self, stream): + _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) + self._list_discard(self.writers, stream.transmit_side) - def _call(self, stream, func): + def _call(self, broker, stream, func): try: - func(self) + func(broker) except Exception: LOG.exception('%r crashed', stream) stream.on_disconnect(self) - def _loop_once(self, timeout=None): - _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) - - #IOLOG.debug('readers = %r', self._readers) - #IOLOG.debug('writers = %r', self._writers) + def poll(self, broker, timeout=None): + _vv and IOLOG.debug('%r.poll(%r)', self, timeout) + #IOLOG.debug('readers = %r', self.readers) + #IOLOG.debug('writers = %r', self.writers) (rsides, wsides, _), _ = io_op(select.select, - self._readers, - self._writers, + self.readers, + self.writers, (), timeout ) for side in rsides: _vv and IOLOG.debug('%r: POLLIN for %r', self, side) - self._call(side.stream, side.stream.on_receive) + self._call(broker, side.stream, side.stream.on_receive) for side in wsides: _vv and IOLOG.debug('%r: POLLOUT for %r', self, side) - self._call(side.stream, side.stream.on_transmit) + self._call(broker, side.stream, side.stream.on_transmit) + + +class Broker(object): + poller_class = Poller + _waker = None + _thread = None + shutdown_timeout = 3.0 + + def __init__(self, poller_class=None): + self._alive = True + self._waker = Waker(self) + self.defer = self._waker.defer + self.poller = self.poller_class() + self.poller.start_receive(self._waker) + self.readers = [self._waker.receive_side] + self.writers = [] + self._thread = threading.Thread( + target=_profile_hook, + args=('broker', self._broker_main), + name='mitogen-broker' + ) + self._thread.start() + self._waker.broker_ident = self._thread.ident + + def start_receive(self, stream): + self.defer(self.poller.start_receive, stream) + + def stop_receive(self, stream): + self.defer(self.poller.stop_receive, stream) + + def _start_transmit(self, stream): + self.poller.start_transmit(stream) + + def _stop_transmit(self, stream): + self.poller.stop_transmit(stream) def keep_alive(self): - return sum((side.keep_alive for side in self._readers), 0) + return sum((side.keep_alive for side in self.poller.readers), 0) def _broker_main(self): try: while self._alive: - self._loop_once() + self.poller.poll(self) fire(self, 'shutdown') - for side in set(self._readers).union(self._writers): - self._call(side.stream, side.stream.on_shutdown) + for side in set(self.poller.readers).union(self.poller.writers): + self.poller._call(self, side.stream, side.stream.on_shutdown) deadline = time.time() + self.shutdown_timeout while self.keep_alive() and time.time() < deadline: - self._loop_once(max(0, deadline - time.time())) + self.poller.poll(self, max(0, deadline - time.time())) if self.keep_alive(): LOG.error('%r: some streams did not close gracefully. ' @@ -1533,7 +1558,7 @@ class Broker(object): 'more child processes still connected to ' 'our stdout/stderr pipes.', self) - for side in set(self._readers).union(self._writers): + for side in set(self.readers).union(self.writers): LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) except Exception: diff --git a/mitogen/master.py b/mitogen/master.py index 22117a50..0805206b 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -661,6 +661,7 @@ class ModuleResponder(object): class Broker(mitogen.core.Broker): shutdown_timeout = 5.0 _watcher = None + poller_class = mitogen.parent.PREFERRED_POLLER def __init__(self, install_watcher=True): if install_watcher: diff --git a/mitogen/parent.py b/mitogen/parent.py index e6d76c6b..835c7e79 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -70,23 +70,6 @@ except: SC_OPEN_MAX = 1024 -class Argv(object): - def __init__(self, argv): - self.argv = argv - - def escape(self, x): - s = '"' - for c in x: - if c in '\\$"`': - s += '\\' - s += c - s += '"' - return s - - def __str__(self): - return ' '.join(map(self.escape, self.argv)) - - def get_log_level(): return (LOG.level or logging.getLogger().level or logging.INFO) @@ -497,6 +480,112 @@ def _proxy_connect(name, method_name, kwargs, econtext): } +class Argv(object): + def __init__(self, argv): + self.argv = argv + + def escape(self, x): + s = '"' + for c in x: + if c in '\\$"`': + s += '\\' + s += c + s += '"' + return s + + def __str__(self): + return ' '.join(map(self.escape, self.argv)) + + + +class Poller(mitogen.core.Poller): + @classmethod + def from_existing(cls, poller): + self = cls() + for reader in poller.readers: + self.start_receive(reader.stream) + for writer in poller.writers: + self.start_transmit(writer.stream) + + +class KqueuePoller(Poller): + _repr = 'KqueuePoller()' + + def __init__(self): + self._kqueue = select.kqueue() + self._reader_by_fd = {} + self._writer_by_fd = {} + self._changelist = [] + + @property + def readers(self): + return list(self._reader_by_fd.values()) + + @property + def writers(self): + return list(self._writer_by_fd.values()) + + def _control(self, side, filters, flags): + mitogen.core._vv and IOLOG.debug( + '%r._control(%r, %r, %r)', self, side, filters, flags) + self._changelist.append(select.kevent(side.fd, filters, flags)) + + def start_receive(self, stream): + mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream) + side = stream.receive_side + assert side and side.fd is not None + if side.fd not in self._reader_by_fd: + self._reader_by_fd[side.fd] = side + self._control(side, select.KQ_FILTER_READ, select.KQ_EV_ADD) + + def stop_receive(self, stream): + mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) + side = stream.receive_side + if side.fd in self._reader_by_fd: + del self._reader_by_fd[stream.receive_side.fd] + self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE) + + def start_transmit(self, stream): + mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream) + side = stream.transmit_side + assert side and side.fd is not None + if side.fd not in self._writer_by_fd: + self._writer_by_fd[side.fd] = side + self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) + + def stop_transmit(self, stream): + mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream) + side = stream.transmit_side + if side.fd in self._writer_by_fd: + del self._writer_by_fd[side.fd] + self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) + + def poll(self, broker, timeout=None): + changelist = self._changelist + self._changelist = [] + for event in self._kqueue.control(changelist, 32, timeout): + if event.filter == select.KQ_FILTER_READ: + side = self._reader_by_fd.get(event.ident) + # Events can still be read for an already-discarded fd. + if side: + mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side) + self._call(broker, side.stream, side.stream.on_receive) + elif event.filter == select.KQ_FILTER_WRITE: + side = self._writer_by_fd.get(event.ident) + if side: + mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side) + self._call(broker, side.stream, side.stream.on_transmit) + + + +POLLER_BY_SYSNAME = { + 'Darwin': KqueuePoller, + 'FreeBSD': KqueuePoller, + #'Linux': EpollPoller, +} +PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) + + class TtyLogStream(mitogen.core.BasicStream): """ For "hybrid TTY/socketpair" mode, after a connection has been setup, a