From 9a8fc5e1676d31a984abc5a68bdb741f4c85b359 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 11 Aug 2016 16:46:51 +0100 Subject: [PATCH] Another attempt at graceful shutdown. --- econtext/core.py | 69 +++++++++++++++++++++++++++------------------- econtext/master.py | 10 ++++++- econtext/utils.py | 3 +- 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index ad6c99fb..925c8bcd 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -30,6 +30,7 @@ IOLOG = logging.getLogger('econtext.io') GET_MODULE = 100L CALL_FUNCTION = 101L FORWARD_LOG = 102L +SHUTDOWN = 103L class ContextError(Exception): @@ -248,6 +249,7 @@ class BasicStream(object): self.write_side.close() def Shutdown(self): + LOG.debug('%r.Shutdown()', self) self.read_side.close() self.write_side.close() @@ -307,15 +309,15 @@ class Stream(BasicStream): self._rhmac.update(self._input_buf[20:msg_len+24]) expected_mac = self._rhmac.digest() if msg_mac != expected_mac: - raise CorruptMessageError('%r bad MAC: %r != got %r; %r', - self, msg_mac.encode('hex'), + raise CorruptMessageError('bad MAC: %r != got %r; %r', + msg_mac.encode('hex'), expected_mac.encode('hex'), self._input_buf[24:msg_len+24]) try: handle, data = self.Unpickle(self._input_buf[24:msg_len+24]) except (TypeError, ValueError), ex: - raise CorruptMessageError('%r got invalid message: %s', self, ex) + raise CorruptMessageError('invalid message: %s', ex) self._input_buf = self._input_buf[msg_len+24:] self._Invoke(handle, data) @@ -337,6 +339,8 @@ class Stream(BasicStream): IOLOG.debug('%r.Transmit()', self) written = os.write(self.write_side.fd, self._output_buf[:4096]) self._output_buf = self._output_buf[written:] + if (not self._alive) and not self._output_buf: + self.Disconnect() def WriteMore(self): return bool(self._output_buf) @@ -365,10 +369,13 @@ class Stream(BasicStream): LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn) fn(_DEAD) + _alive = True + def Shutdown(self): + # This works for entirely the wrong reason. Depends on partial + # _output_buf always being maintained by accident. LOG.debug('%r.Shutdown()', self) - # Cannot use .shutdown() since it may be a pipe. - self.write_side.close() + self._alive = False def Accept(self, rfd, wfd): self.read_side = Side(self, os.dup(rfd)) @@ -408,11 +415,20 @@ class Context(object): self._last_handle = 1000L self._handle_map = {} self._lock = threading.Lock() + self.AddHandleCB(self._Shutdown, SHUTDOWN) + + def Shutdown(self): + """Slave does nothing, _BrokerMain() will .Shutdown its streams.""" + + def _Shutdown(self, data): + LOG.debug('Received SHUTDOWN') + if data != _DEAD and self.stream: + self.broker.Shutdown() def Disconnect(self): self.stream = None LOG.debug('Parent stream is gone, dying.') - self.broker.Finalize() + self.broker.Shutdown() def AllocHandle(self): """Allocate a handle.""" @@ -501,7 +517,8 @@ class Waker(BasicStream): return '' def Wake(self): - os.write(self.write_side.fd, ' ') + if self.write_side.fd: + os.write(self.write_side.fd, ' ') def Receive(self): os.read(self.read_side.fd, 1) @@ -534,7 +551,7 @@ class IoLogger(BasicStream): def Shutdown(self): LOG.debug('%r.Shutdown()', self) - self._wsock.shutdown(2) + self._wsock.shutdown(socket.SHUT_WR) self._wsock.close() def Receive(self): @@ -556,7 +573,7 @@ class Broker(object): def __init__(self): self._alive = True - self._lock = threading.Lock() + self._lock = threading.RLock() self._contexts = {} self._readers = set() self._writers = set() @@ -582,12 +599,6 @@ class Broker(object): finally: self._lock.release() - def RemoveStream(self, stream): - self._writers.discard(stream.write_side) - self._readers.discard(stream.read_side) - if self._waker: - self._waker.Wake() - def UpdateStream(self, stream): self._UpdateStream(stream) if self._waker: @@ -610,11 +621,12 @@ class Broker(object): stream.Disconnect() self._UpdateStream(stream) - def _LoopOnce(self): - IOLOG.debug('%r.Loop()', self) - # IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) - # IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) - rsides, wsides, _ = select.select(self._readers, self._writers, ()) + def _LoopOnce(self, timeout=None): + IOLOG.debug('%r._LoopOnce(%r)', self, timeout) + #IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) + #IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) + rsides, wsides, _ = select.select(self._readers, self._writers, + (), timeout) for side in rsides: IOLOG.debug('%r: POLLIN for %r', self, side.stream) self._CallAndUpdate(side.stream, side.stream.Receive) @@ -624,7 +636,7 @@ class Broker(object): self._CallAndUpdate(side.stream, side.stream.Transmit) def _BrokerMain(self): - """Handle events until Finalize() is called.""" + """Handle events until Shutdown() is called.""" try: while self._alive: self._LoopOnce() @@ -632,10 +644,9 @@ class Broker(object): for side in self._readers | self._writers: self._CallAndUpdate(side.stream, side.stream.Shutdown) - deadline = time.time() + 1.0 + deadline = time.time() + 5.0 while (self._readers or self._writers) and time.time() < deadline: - LOG.error('%s', [self._readers, self._writers]) - self._LoopOnce() + self._LoopOnce(1.0) for side in self._readers | self._writers: LOG.error('_BrokerMain() force disconnecting %r', side.stream) @@ -647,8 +658,9 @@ class Broker(object): """Wait for the broker to stop.""" self._thread.join() - def Finalize(self): - """Disconect all streams and wait for broker to stop.""" + def Shutdown(self): + """Gracefully disconnect streams and wait for broker to stop.""" + LOG.debug('%r.Shutdown()', self) self._alive = False self._waker.Wake() @@ -732,6 +744,5 @@ class ExternalContext(object): self._DispatchCalls() self.broker.Wait() LOG.debug('ExternalContext.main() exitting') - except Exception: - raise - self.broker.Finalize() + finally: + self.broker.Shutdown() diff --git a/econtext/master.py b/econtext/master.py index 0833996d..0486e9a9 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -23,6 +23,8 @@ COMMENT_RE = re.compile(r'^\s*#.*$', re.M) def MinimizeSource(source): + """Remove comments and docstrings from Python `source`, preserving line + numbers and syntax of empty blocks.""" subber = lambda match: '""' + ('\n' * match.group(0).count('\n')) source = DOCSTRING_RE.sub(subber, source) source = COMMENT_RE.sub('\n', source) @@ -30,6 +32,7 @@ def MinimizeSource(source): def GetChildModules(module, prefix): + """Return the canonical names of all submodules of a package `module`.""" it = pkgutil.iter_modules(module.__path__, prefix) return [name for _, name, _ in it] @@ -134,6 +137,11 @@ class LocalStream(econtext.core.Stream): super(LocalStream, self).__init__(context) self._permitted_classes = set([('econtext.core', 'CallError')]) + def Shutdown(self): + """Requesting the slave gracefully shut itself down.""" + LOG.debug('%r enqueing SHUTDOWN') + self.Enqueue(econtext.core.SHUTDOWN, None) + def _FindGlobal(self, module_name, class_name): """Return the class implementing `module_name.class_name` or raise `StreamError` if the module is not whitelisted.""" @@ -195,7 +203,7 @@ class LocalStream(econtext.core.Stream): LOG.debug('%r.Connect()', self) pid, sock = CreateChild(*self.GetBootCommand()) self.read_side = econtext.core.Side(self, os.dup(sock.fileno())) - self.write_side = self.read_side + self.write_side = econtext.core.Side(self, os.dup(sock.fileno())) sock.close() LOG.debug('%r.Connect(): child process stdin/stdout=%r', self, self.read_side.fd) diff --git a/econtext/utils.py b/econtext/utils.py index c2c109ca..5276e459 100644 --- a/econtext/utils.py +++ b/econtext/utils.py @@ -19,7 +19,8 @@ def run_with_broker(func, *args, **kwargs): try: return func(broker, *args, **kwargs) finally: - broker.Finalize() + broker.Shutdown() + broker.Wait() def with_broker(func):