|
|
|
@ -339,8 +339,6 @@ class Stream(BasicStream):
|
|
|
|
|
IOLOG.debug('%r.Transmit()', self)
|
|
|
|
|
written = os.write(self.write_side.fd, self._output_buf[:4096])
|
|
|
|
|
self._output_buf = self._output_buf[written:]
|
|
|
|
|
if (not self._alive) and not self._output_buf:
|
|
|
|
|
self.Disconnect()
|
|
|
|
|
|
|
|
|
|
def WriteMore(self):
|
|
|
|
|
return bool(self._output_buf)
|
|
|
|
@ -369,13 +367,10 @@ class Stream(BasicStream):
|
|
|
|
|
LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn)
|
|
|
|
|
fn(_DEAD)
|
|
|
|
|
|
|
|
|
|
_alive = True
|
|
|
|
|
|
|
|
|
|
def Shutdown(self):
|
|
|
|
|
# This works for entirely the wrong reason. Depends on partial
|
|
|
|
|
# _output_buf always being maintained by accident.
|
|
|
|
|
LOG.debug('%r.Shutdown()', self)
|
|
|
|
|
self._alive = False
|
|
|
|
|
|
|
|
|
|
def Accept(self, rfd, wfd):
|
|
|
|
|
self.read_side = Side(self, os.dup(rfd))
|
|
|
|
@ -539,6 +534,7 @@ class IoLogger(BasicStream):
|
|
|
|
|
|
|
|
|
|
self.read_side = Side(self, self._rsock.fileno())
|
|
|
|
|
self.write_side = Side(self, dest_fd)
|
|
|
|
|
broker.graceful_count += 1
|
|
|
|
|
self._broker.UpdateStream(self)
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
@ -558,6 +554,7 @@ class IoLogger(BasicStream):
|
|
|
|
|
LOG.debug('%r.Receive()', self)
|
|
|
|
|
buf = os.read(self.read_side.fd, 4096)
|
|
|
|
|
if not buf:
|
|
|
|
|
self._broker.graceful_count -= 1
|
|
|
|
|
return self.Disconnect()
|
|
|
|
|
|
|
|
|
|
self._buf += buf
|
|
|
|
@ -570,6 +567,8 @@ class Broker(object):
|
|
|
|
|
stream that is associated with them, and for I/O multiplexing.
|
|
|
|
|
"""
|
|
|
|
|
_waker = None
|
|
|
|
|
graceful_count = 0
|
|
|
|
|
graceful_timeout = 3.0
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self._alive = True
|
|
|
|
@ -644,12 +643,18 @@ class Broker(object):
|
|
|
|
|
for side in self._readers | self._writers:
|
|
|
|
|
self._CallAndUpdate(side.stream, side.stream.Shutdown)
|
|
|
|
|
|
|
|
|
|
deadline = time.time() + 1.0
|
|
|
|
|
while (self._readers or self._writers) and time.time() < deadline:
|
|
|
|
|
deadline = time.time() + self.graceful_timeout
|
|
|
|
|
while self.graceful_count and time.time() < deadline:
|
|
|
|
|
self._LoopOnce(1.0)
|
|
|
|
|
|
|
|
|
|
for context in self._contexts.itervalues():
|
|
|
|
|
stream = context.stream
|
|
|
|
|
if stream:
|
|
|
|
|
stream.Disconnect()
|
|
|
|
|
self._UpdateStream(stream)
|
|
|
|
|
|
|
|
|
|
for side in self._readers | self._writers:
|
|
|
|
|
LOG.error('_BrokerMain() force disconnecting %r', side.stream)
|
|
|
|
|
LOG.error('_BrokerMain() force disconnecting %r', side)
|
|
|
|
|
side.stream.Disconnect()
|
|
|
|
|
except Exception:
|
|
|
|
|
LOG.exception('_BrokerMain() crashed')
|
|
|
|
|