issue #249: initial poller implementation (BSD only)

pull/255/head
David Wilson 7 years ago
parent b7ab473343
commit bc7be1879d

@ -261,6 +261,19 @@ Other Stream Subclasses
:members: :members:
Poller Class
------------
.. currentmodule:: mitogen.core
.. autoclass:: Poller
.. currentmodule:: mitogen.parent
.. autoclass:: KqueuePoller
.. currentmodule:: mitogen.parent
.. autoclass:: EpollPoller
Importer Class Importer Class
-------------- --------------

@ -1437,24 +1437,15 @@ class Router(object):
self.broker.defer(self._async_route, msg) self.broker.defer(self._async_route, msg)
class Broker(object): class Poller(object):
_waker = None
_thread = None
shutdown_timeout = 3.0
def __init__(self): def __init__(self):
self._alive = True self.readers = []
self._waker = Waker(self) self.writers = []
self.defer = self._waker.defer
self._readers = [self._waker.receive_side] _repr = 'Poller()'
self._writers = []
self._thread = threading.Thread( def __repr__(self):
target=_profile_hook, return self._repr
args=('broker', self._broker_main),
name='mitogen-broker'
)
self._thread.start()
self._waker.broker_ident = self._thread.ident
def _list_discard(self, lst, value): def _list_discard(self, lst, value):
try: try:
@ -1469,63 +1460,97 @@ class Broker(object):
def start_receive(self, stream): def start_receive(self, stream):
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
assert stream.receive_side and stream.receive_side.fd is not None assert stream.receive_side and stream.receive_side.fd is not None
self.defer(self._list_add, self._readers, stream.receive_side) self._list_add(self.readers, stream.receive_side)
def stop_receive(self, stream): def stop_receive(self, stream):
IOLOG.debug('%r.stop_receive(%r)', self, stream) _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self._list_discard, self._readers, stream.receive_side) self._list_discard(self.readers, stream.receive_side)
def _start_transmit(self, stream): def start_transmit(self, stream):
IOLOG.debug('%r._start_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
assert stream.transmit_side and stream.transmit_side.fd is not None assert stream.transmit_side and stream.transmit_side.fd is not None
self._list_add(self._writers, stream.transmit_side) self._list_add(self.writers, stream.transmit_side)
def _stop_transmit(self, stream): def stop_transmit(self, stream):
IOLOG.debug('%r._stop_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self._list_discard(self._writers, stream.transmit_side) self._list_discard(self.writers, stream.transmit_side)
def _call(self, stream, func): def _call(self, broker, stream, func):
try: try:
func(self) func(broker)
except Exception: except Exception:
LOG.exception('%r crashed', stream) LOG.exception('%r crashed', stream)
stream.on_disconnect(self) stream.on_disconnect(self)
def _loop_once(self, timeout=None): def poll(self, broker, timeout=None):
_vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) _vv and IOLOG.debug('%r.poll(%r)', self, timeout)
#IOLOG.debug('readers = %r', self.readers)
#IOLOG.debug('readers = %r', self._readers) #IOLOG.debug('writers = %r', self.writers)
#IOLOG.debug('writers = %r', self._writers)
(rsides, wsides, _), _ = io_op(select.select, (rsides, wsides, _), _ = io_op(select.select,
self._readers, self.readers,
self._writers, self.writers,
(), timeout (), timeout
) )
for side in rsides: for side in rsides:
_vv and IOLOG.debug('%r: POLLIN for %r', self, side) _vv and IOLOG.debug('%r: POLLIN for %r', self, side)
self._call(side.stream, side.stream.on_receive) self._call(broker, side.stream, side.stream.on_receive)
for side in wsides: for side in wsides:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, side) _vv and IOLOG.debug('%r: POLLOUT for %r', self, side)
self._call(side.stream, side.stream.on_transmit) self._call(broker, side.stream, side.stream.on_transmit)
class Broker(object):
poller_class = Poller
_waker = None
_thread = None
shutdown_timeout = 3.0
def __init__(self, poller_class=None):
self._alive = True
self._waker = Waker(self)
self.defer = self._waker.defer
self.poller = self.poller_class()
self.poller.start_receive(self._waker)
self.readers = [self._waker.receive_side]
self.writers = []
self._thread = threading.Thread(
target=_profile_hook,
args=('broker', self._broker_main),
name='mitogen-broker'
)
self._thread.start()
self._waker.broker_ident = self._thread.ident
def start_receive(self, stream):
self.defer(self.poller.start_receive, stream)
def stop_receive(self, stream):
self.defer(self.poller.stop_receive, stream)
def _start_transmit(self, stream):
self.poller.start_transmit(stream)
def _stop_transmit(self, stream):
self.poller.stop_transmit(stream)
def keep_alive(self): def keep_alive(self):
return sum((side.keep_alive for side in self._readers), 0) return sum((side.keep_alive for side in self.poller.readers), 0)
def _broker_main(self): def _broker_main(self):
try: try:
while self._alive: while self._alive:
self._loop_once() self.poller.poll(self)
fire(self, 'shutdown') fire(self, 'shutdown')
for side in set(self._readers).union(self._writers): for side in set(self.poller.readers).union(self.poller.writers):
self._call(side.stream, side.stream.on_shutdown) self.poller._call(self, side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout deadline = time.time() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline: while self.keep_alive() and time.time() < deadline:
self._loop_once(max(0, deadline - time.time())) self.poller.poll(self, max(0, deadline - time.time()))
if self.keep_alive(): if self.keep_alive():
LOG.error('%r: some streams did not close gracefully. ' LOG.error('%r: some streams did not close gracefully. '
@ -1533,7 +1558,7 @@ class Broker(object):
'more child processes still connected to ' 'more child processes still connected to '
'our stdout/stderr pipes.', self) 'our stdout/stderr pipes.', self)
for side in set(self._readers).union(self._writers): for side in set(self.readers).union(self.writers):
LOG.error('_broker_main() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self) side.stream.on_disconnect(self)
except Exception: except Exception:

@ -661,6 +661,7 @@ class ModuleResponder(object):
class Broker(mitogen.core.Broker): class Broker(mitogen.core.Broker):
shutdown_timeout = 5.0 shutdown_timeout = 5.0
_watcher = None _watcher = None
poller_class = mitogen.parent.PREFERRED_POLLER
def __init__(self, install_watcher=True): def __init__(self, install_watcher=True):
if install_watcher: if install_watcher:

@ -70,23 +70,6 @@ except:
SC_OPEN_MAX = 1024 SC_OPEN_MAX = 1024
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def get_log_level(): def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO) return (LOG.level or logging.getLogger().level or logging.INFO)
@ -497,6 +480,112 @@ def _proxy_connect(name, method_name, kwargs, econtext):
} }
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
class Poller(mitogen.core.Poller):
@classmethod
def from_existing(cls, poller):
self = cls()
for reader in poller.readers:
self.start_receive(reader.stream)
for writer in poller.writers:
self.start_transmit(writer.stream)
class KqueuePoller(Poller):
_repr = 'KqueuePoller()'
def __init__(self):
self._kqueue = select.kqueue()
self._reader_by_fd = {}
self._writer_by_fd = {}
self._changelist = []
@property
def readers(self):
return list(self._reader_by_fd.values())
@property
def writers(self):
return list(self._writer_by_fd.values())
def _control(self, side, filters, flags):
mitogen.core._vv and IOLOG.debug(
'%r._control(%r, %r, %r)', self, side, filters, flags)
self._changelist.append(select.kevent(side.fd, filters, flags))
def start_receive(self, stream):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side
assert side and side.fd is not None
if side.fd not in self._reader_by_fd:
self._reader_by_fd[side.fd] = side
self._control(side, select.KQ_FILTER_READ, select.KQ_EV_ADD)
def stop_receive(self, stream):
mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
side = stream.receive_side
if side.fd in self._reader_by_fd:
del self._reader_by_fd[stream.receive_side.fd]
self._control(side, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
def start_transmit(self, stream):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, stream)
side = stream.transmit_side
assert side and side.fd is not None
if side.fd not in self._writer_by_fd:
self._writer_by_fd[side.fd] = side
self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
def stop_transmit(self, stream):
mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, stream)
side = stream.transmit_side
if side.fd in self._writer_by_fd:
del self._writer_by_fd[side.fd]
self._control(side, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
def poll(self, broker, timeout=None):
changelist = self._changelist
self._changelist = []
for event in self._kqueue.control(changelist, 32, timeout):
if event.filter == select.KQ_FILTER_READ:
side = self._reader_by_fd.get(event.ident)
# Events can still be read for an already-discarded fd.
if side:
mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, side)
self._call(broker, side.stream, side.stream.on_receive)
elif event.filter == select.KQ_FILTER_WRITE:
side = self._writer_by_fd.get(event.ident)
if side:
mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, side)
self._call(broker, side.stream, side.stream.on_transmit)
POLLER_BY_SYSNAME = {
'Darwin': KqueuePoller,
'FreeBSD': KqueuePoller,
#'Linux': EpollPoller,
}
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller)
class TtyLogStream(mitogen.core.BasicStream): class TtyLogStream(mitogen.core.BasicStream):
""" """
For "hybrid TTY/socketpair" mode, after a connection has been setup, a For "hybrid TTY/socketpair" mode, after a connection has been setup, a

Loading…
Cancel
Save