Beginnings of graceful shutdown.

pull/35/head
David Wilson 8 years ago
parent 8a081a103f
commit d8b6aa8902

@ -5,6 +5,7 @@ Python external execution contexts.
import Queue
import cPickle
import cStringIO
import errno
import fcntl
import hmac
import imp
@ -17,6 +18,7 @@ import socket
import struct
import sys
import threading
import time
import traceback
import types
import zlib
@ -219,19 +221,35 @@ class Side(object):
self.fd = fd
def __repr__(self):
return '<fd %r of %r>' % (self.fd, self.stream)
return '<Side of %r fd %s>' % (self.stream, self.fd)
def fileno(self):
if self.fd is None:
raise StreamError('%r.fileno() called but no FD set', self)
return self.fd
def close(self):
if self.fd is not None:
try:
os.close(self.fd)
except OSError, e:
if e.errno != errno.EBADF:
LOG.error('%r: close failed', self, e)
self.fd = None
class BasicStream(object):
read_side = None
write_side = None
def Disconnect(self):
LOG.debug('%r: disconnect on %r', self._broker, self)
self._broker.RemoveStream(self)
LOG.debug('%r.Disconnect()', self)
self.read_side.close()
self.write_side.close()
def Shutdown(self):
self.read_side.close()
self.write_side.close()
def ReadMore(self):
return True
@ -342,29 +360,19 @@ class Stream(BasicStream):
def Disconnect(self):
"""Close our associated file descriptor and tell registered callbacks
the connection has been destroyed."""
LOG.debug('%r.Disconnect()', self)
super(Stream, self).Disconnect()
if self._context.stream is self:
self._context.Disconnect()
try:
os.close(self.read_side.fd)
except OSError, e:
LOG.debug('%r.Disconnect(): did not close fd %s: %s',
self, self.read_side.fd, e)
if self.read_side.fd != self.write_side.fd:
try:
os.close(self.write_side.fd)
except OSError, e:
LOG.debug('%r.Disconnect(): did not close fd %s: %s',
self, self.write_side.fd, e)
self.read_side.fd = None
self.write_side.fd = None
for handle, (persist, fn) in self._context._handle_map.iteritems():
LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn)
fn(_DEAD)
def Shutdown(self):
LOG.debug('%r.Shutdown()', self)
# Cannot use .shutdown() since it may be a pipe.
self.write_side.close()
def Accept(self, rfd, wfd):
self.read_side = Side(self, os.dup(rfd))
self.write_side = Side(self, os.dup(wfd))
@ -509,13 +517,13 @@ class IoLogger(BasicStream):
self._broker = broker
self._name = name
self._log = logging.getLogger(name)
rfd, wfd = os.pipe()
self._rsock, self._wsock = socket.socketpair()
set_cloexec(rfd)
os.dup2(wfd, dest_fd)
os.close(wfd)
os.dup2(self._wsock.fileno(), dest_fd)
set_cloexec(self._rsock.fileno())
set_cloexec(self._wsock.fileno())
self.read_side = Side(self, rfd)
self.read_side = Side(self, self._rsock.fileno())
self.write_side = Side(self, dest_fd)
self._broker.UpdateStream(self)
@ -527,6 +535,11 @@ class IoLogger(BasicStream):
line, _, self._buf = self._buf.partition('\n')
self._log.info('%s', line.rstrip('\n'))
def Shutdown(self):
LOG.debug('%r.Shutdown()', self)
self._wsock.shutdown(2)
self._wsock.close()
def Receive(self):
LOG.debug('%r.Receive()', self)
buf = os.read(self.read_side.fd, 4096)
@ -560,18 +573,24 @@ class Broker(object):
IOLOG.debug('_UpdateStream(%r)', stream)
self._lock.acquire()
try:
if stream.ReadMore() and stream.read_side.fileno():
if stream.read_side.fd is not None and stream.ReadMore():
self._readers.add(stream.read_side)
else:
self._readers.discard(stream.read_side)
if stream.WriteMore() and stream.write_side.fileno():
if stream.write_side.fd is not None and stream.WriteMore():
self._writers.add(stream.write_side)
else:
self._writers.discard(stream.write_side)
finally:
self._lock.release()
def RemoveStream(self, stream):
self._writers.discard(stream.write_side)
self._readers.discard(stream.read_side)
if self._waker:
self._waker.Wake()
def UpdateStream(self, stream):
self._UpdateStream(stream)
if self._waker:
@ -613,11 +632,19 @@ class Broker(object):
while self._alive:
self._LoopOnce()
for context in self._contexts.itervalues():
if context.stream:
context.stream.Disconnect()
for side in self._readers | self._writers:
self._CallAndUpdate(side.stream, side.stream.Shutdown)
deadline = time.time() + 1.0
while (self._readers or self._writers) and time.time() < deadline:
LOG.error('%s', [self._readers, self._writers])
self._LoopOnce()
for side in self._readers | self._writers:
LOG.error('_BrokerMain() force disconnecting %r', side.stream)
side.stream.Disconnect()
except Exception:
LOG.exception('Loop() crashed')
LOG.exception('_BrokerMain() crashed')
def Wait(self):
"""Wait for the broker to stop."""

Loading…
Cancel
Save