Generalize ECONNRESET, EIO handling.

pull/35/head
David Wilson 7 years ago
parent 0aff1d82c6
commit 821c9dfcb6

@ -101,6 +101,22 @@ def set_cloexec(fd):
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def io_op(func, *args):
"""
When connected over a TTY (i.e. sudo), disconnection of the remote end is
signalled by EIO, rather than an empty read like sockets or pipes. Ideally
this will be replaced later by a 'goodbye' message to avoid reading from a
disconnected endpoint, allowing for more robust error reporting.
"""
try:
return func(*args), False
except OSError, e:
IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e.errno not in (errno.EIO, errno.ECONNRESET):
raise
return None, True
def enable_debug_logging():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
@ -443,19 +459,8 @@ class Stream(BasicStream):
:py:class:`StreamError` on failure."""
IOLOG.debug('%r.on_receive()', self)
try:
buf = os.read(self.receive_side.fd, CHUNK_SIZE)
IOLOG.debug('%r.on_receive() -> len %d', self, len(buf))
except OSError, e:
IOLOG.debug('%r.on_receive() -> OSError: %s', self, e)
# When connected over a TTY (i.e. sudo), disconnection of the
# remote end is signalled by EIO, rather than an empty read like
# sockets or pipes. Ideally this will be replaced later by a
# 'goodbye' message to avoid reading from a disconnected endpoint,
# allowing for more robust error reporting.
if e.errno not in (errno.EIO, errno.ECONNRESET):
raise
LOG.error('%r.on_receive(): %s', self, e)
buf, disconnected = io_op(os.read, self.receive_side.fd, CHUNK_SIZE)
if disconnected:
buf = ''
self._input_buf += buf
@ -503,7 +508,14 @@ class Stream(BasicStream):
def on_transmit(self, broker):
"""Transmit buffered messages."""
IOLOG.debug('%r.on_transmit()', self)
written = os.write(self.transmit_side.fd, self._output_buf[:CHUNK_SIZE])
written, disconnected = io_op(os.write, self.transmit_side.fd,
self._output_buf[:CHUNK_SIZE])
if disconnected:
LOG.debug('%r.on_transmit(): disconnection detected', self)
self.on_disconnect()
return
IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf = self._output_buf[written:]
if not self._output_buf:

@ -103,13 +103,8 @@ def iter_read(fd, deadline):
bits = []
while True:
try:
s = os.read(fd, 4096)
except OSError, e:
IOLOG.debug('iter_read(%r) -> OSError: %s', fd, e)
# See econtext.core.on_receive() EIO comment.
if e.errno != errno.EIO:
raise
s, disconnected = econtext.core.io_op(os.read, fd, 4096)
if disconnected:
s = ''
if not s:

Loading…
Cancel
Save