From 7d0209d8decbee244c926cbffaf274c323816066 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 15 May 2018 09:28:03 +0100 Subject: [PATCH] issue #249: have upgrade_router() upgrade the poller too. Now when a child becomes a parent, it gets a new poller suitable for many more children than was possible using select(). --- mitogen/parent.py | 51 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/mitogen/parent.py b/mitogen/parent.py index 089d8603..692adf24 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -422,8 +422,41 @@ def discard_until(fd, s, deadline): return +def _upgrade_broker(broker): + """ + Extract the poller state from Broker and replace it with the industrial + strength poller for this OS. Must run on the Broker thread. + """ + # This function is deadly! The act of calling start_receive() generates log + # messages which must be silenced as the upgrade progresses, otherwise the + # poller state will change as it is copied, resulting in write fds that are + # lost. (Due to LogHandler->Router->Stream->Broker->Poller, where Stream + # only calls start_transmit() when transitioning from empty to non-empty + # buffer. If the start_transmit() is lost, writes from the child hang + # permanently). + root = logging.getLogger() + old_level = root.level + root.setLevel(logging.CRITICAL) + + old = broker.poller + new = PREFERRED_POLLER() + for fd, data in old.readers: + new.start_receive(fd, data) + for fd, data in old.writers: + new.start_transmit(fd, data) + + old.close() + broker.poller = new + root.setLevel(old_level) + LOG.debug('replaced %r with %r (new: %d readers, %d writers; ' + 'old: %d readers, %d writers)', old, new, + len(new.readers), len(new.writers), + len(old.readers), len(old.writers)) + + def upgrade_router(econtext): if not isinstance(econtext.router, Router): # TODO + econtext.broker.defer(_upgrade_broker, econtext.broker) econtext.router.__class__ = Router # TODO econtext.router.upgrade( importer=econtext.importer, @@ -500,17 +533,7 @@ class Argv(object): return ' '.join(map(self.escape, self.argv)) -class Poller(mitogen.core.Poller): - @classmethod - def from_existing(cls, poller): - self = cls() - for fd, data in poller.readers: - self.start_receive(fd, data) - for fd, data in poller.writers: - self.start_transmit(fd, data) - - -class KqueuePoller(Poller): +class KqueuePoller(mitogen.core.Poller): _repr = 'KqueuePoller()' def __init__(self): @@ -577,11 +600,11 @@ class KqueuePoller(Poller): yield self._wfds[fd] -class EpollPoller(Poller): +class EpollPoller(mitogen.core.Poller): _repr = 'EpollPoller()' def __init__(self): - self._epoll = select.epoll() + self._epoll = select.epoll(32) self._registered_fds = set() self._rfds = {} self._wfds = {} @@ -641,7 +664,7 @@ class EpollPoller(Poller): if timeout is not None: the_timeout = timeout - events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout) + events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32) for fd, event in events: if event & self._inmask and fd in self._rfds: # Events can still be read for an already-discarded fd.