Another attempt at graceful shutdown.

pull/35/head
David Wilson 8 years ago
parent 4947fb2c40
commit 9a8fc5e167

@ -30,6 +30,7 @@ IOLOG = logging.getLogger('econtext.io')
GET_MODULE = 100L
CALL_FUNCTION = 101L
FORWARD_LOG = 102L
SHUTDOWN = 103L
class ContextError(Exception):
@ -248,6 +249,7 @@ class BasicStream(object):
self.write_side.close()
def Shutdown(self):
LOG.debug('%r.Shutdown()', self)
self.read_side.close()
self.write_side.close()
@ -307,15 +309,15 @@ class Stream(BasicStream):
self._rhmac.update(self._input_buf[20:msg_len+24])
expected_mac = self._rhmac.digest()
if msg_mac != expected_mac:
raise CorruptMessageError('%r bad MAC: %r != got %r; %r',
self, msg_mac.encode('hex'),
raise CorruptMessageError('bad MAC: %r != got %r; %r',
msg_mac.encode('hex'),
expected_mac.encode('hex'),
self._input_buf[24:msg_len+24])
try:
handle, data = self.Unpickle(self._input_buf[24:msg_len+24])
except (TypeError, ValueError), ex:
raise CorruptMessageError('%r got invalid message: %s', self, ex)
raise CorruptMessageError('invalid message: %s', ex)
self._input_buf = self._input_buf[msg_len+24:]
self._Invoke(handle, data)
@ -337,6 +339,8 @@ class Stream(BasicStream):
IOLOG.debug('%r.Transmit()', self)
written = os.write(self.write_side.fd, self._output_buf[:4096])
self._output_buf = self._output_buf[written:]
if (not self._alive) and not self._output_buf:
self.Disconnect()
def WriteMore(self):
return bool(self._output_buf)
@ -365,10 +369,13 @@ class Stream(BasicStream):
LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn)
fn(_DEAD)
_alive = True
def Shutdown(self):
# This works for entirely the wrong reason. Depends on partial
# _output_buf always being maintained by accident.
LOG.debug('%r.Shutdown()', self)
# Cannot use .shutdown() since it may be a pipe.
self.write_side.close()
self._alive = False
def Accept(self, rfd, wfd):
self.read_side = Side(self, os.dup(rfd))
@ -408,11 +415,20 @@ class Context(object):
self._last_handle = 1000L
self._handle_map = {}
self._lock = threading.Lock()
self.AddHandleCB(self._Shutdown, SHUTDOWN)
def Shutdown(self):
"""Slave does nothing, _BrokerMain() will .Shutdown its streams."""
def _Shutdown(self, data):
LOG.debug('Received SHUTDOWN')
if data != _DEAD and self.stream:
self.broker.Shutdown()
def Disconnect(self):
self.stream = None
LOG.debug('Parent stream is gone, dying.')
self.broker.Finalize()
self.broker.Shutdown()
def AllocHandle(self):
"""Allocate a handle."""
@ -501,7 +517,8 @@ class Waker(BasicStream):
return '<Waker>'
def Wake(self):
os.write(self.write_side.fd, ' ')
if self.write_side.fd:
os.write(self.write_side.fd, ' ')
def Receive(self):
os.read(self.read_side.fd, 1)
@ -534,7 +551,7 @@ class IoLogger(BasicStream):
def Shutdown(self):
LOG.debug('%r.Shutdown()', self)
self._wsock.shutdown(2)
self._wsock.shutdown(socket.SHUT_WR)
self._wsock.close()
def Receive(self):
@ -556,7 +573,7 @@ class Broker(object):
def __init__(self):
self._alive = True
self._lock = threading.Lock()
self._lock = threading.RLock()
self._contexts = {}
self._readers = set()
self._writers = set()
@ -582,12 +599,6 @@ class Broker(object):
finally:
self._lock.release()
def RemoveStream(self, stream):
self._writers.discard(stream.write_side)
self._readers.discard(stream.read_side)
if self._waker:
self._waker.Wake()
def UpdateStream(self, stream):
self._UpdateStream(stream)
if self._waker:
@ -610,11 +621,12 @@ class Broker(object):
stream.Disconnect()
self._UpdateStream(stream)
def _LoopOnce(self):
IOLOG.debug('%r.Loop()', self)
# IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
# IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
rsides, wsides, _ = select.select(self._readers, self._writers, ())
def _LoopOnce(self, timeout=None):
IOLOG.debug('%r._LoopOnce(%r)', self, timeout)
#IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers])
#IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers])
rsides, wsides, _ = select.select(self._readers, self._writers,
(), timeout)
for side in rsides:
IOLOG.debug('%r: POLLIN for %r', self, side.stream)
self._CallAndUpdate(side.stream, side.stream.Receive)
@ -624,7 +636,7 @@ class Broker(object):
self._CallAndUpdate(side.stream, side.stream.Transmit)
def _BrokerMain(self):
"""Handle events until Finalize() is called."""
"""Handle events until Shutdown() is called."""
try:
while self._alive:
self._LoopOnce()
@ -632,10 +644,9 @@ class Broker(object):
for side in self._readers | self._writers:
self._CallAndUpdate(side.stream, side.stream.Shutdown)
deadline = time.time() + 1.0
deadline = time.time() + 5.0
while (self._readers or self._writers) and time.time() < deadline:
LOG.error('%s', [self._readers, self._writers])
self._LoopOnce()
self._LoopOnce(1.0)
for side in self._readers | self._writers:
LOG.error('_BrokerMain() force disconnecting %r', side.stream)
@ -647,8 +658,9 @@ class Broker(object):
"""Wait for the broker to stop."""
self._thread.join()
def Finalize(self):
"""Disconect all streams and wait for broker to stop."""
def Shutdown(self):
"""Gracefully disconnect streams and wait for broker to stop."""
LOG.debug('%r.Shutdown()', self)
self._alive = False
self._waker.Wake()
@ -732,6 +744,5 @@ class ExternalContext(object):
self._DispatchCalls()
self.broker.Wait()
LOG.debug('ExternalContext.main() exitting')
except Exception:
raise
self.broker.Finalize()
finally:
self.broker.Shutdown()

@ -23,6 +23,8 @@ COMMENT_RE = re.compile(r'^\s*#.*$', re.M)
def MinimizeSource(source):
"""Remove comments and docstrings from Python `source`, preserving line
numbers and syntax of empty blocks."""
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
source = COMMENT_RE.sub('\n', source)
@ -30,6 +32,7 @@ def MinimizeSource(source):
def GetChildModules(module, prefix):
"""Return the canonical names of all submodules of a package `module`."""
it = pkgutil.iter_modules(module.__path__, prefix)
return [name for _, name, _ in it]
@ -134,6 +137,11 @@ class LocalStream(econtext.core.Stream):
super(LocalStream, self).__init__(context)
self._permitted_classes = set([('econtext.core', 'CallError')])
def Shutdown(self):
"""Requesting the slave gracefully shut itself down."""
LOG.debug('%r enqueing SHUTDOWN')
self.Enqueue(econtext.core.SHUTDOWN, None)
def _FindGlobal(self, module_name, class_name):
"""Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted."""
@ -195,7 +203,7 @@ class LocalStream(econtext.core.Stream):
LOG.debug('%r.Connect()', self)
pid, sock = CreateChild(*self.GetBootCommand())
self.read_side = econtext.core.Side(self, os.dup(sock.fileno()))
self.write_side = self.read_side
self.write_side = econtext.core.Side(self, os.dup(sock.fileno()))
sock.close()
LOG.debug('%r.Connect(): child process stdin/stdout=%r',
self, self.read_side.fd)

@ -19,7 +19,8 @@ def run_with_broker(func, *args, **kwargs):
try:
return func(broker, *args, **kwargs)
finally:
broker.Finalize()
broker.Shutdown()
broker.Wait()
def with_broker(func):

Loading…
Cancel
Save