From 821c9dfcb6737128bec26679a061524b330a677a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 24 Aug 2017 17:04:20 +0530 Subject: [PATCH] Generalize ECONNRESET, EIO handling. --- econtext/core.py | 40 ++++++++++++++++++++++++++-------------- econtext/master.py | 9 ++------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index 8f99528a..fbca6b6f 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -101,6 +101,22 @@ def set_cloexec(fd): fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) +def io_op(func, *args): + """ + When connected over a TTY (i.e. sudo), disconnection of the remote end is + signalled by EIO, rather than an empty read like sockets or pipes. Ideally + this will be replaced later by a 'goodbye' message to avoid reading from a + disconnected endpoint, allowing for more robust error reporting. + """ + try: + return func(*args), False + except OSError, e: + IOLOG.debug('io_op(%r) -> OSError: %s', func, e) + if e.errno not in (errno.EIO, errno.ECONNRESET): + raise + return None, True + + def enable_debug_logging(): root = logging.getLogger() root.setLevel(logging.DEBUG) @@ -443,19 +459,8 @@ class Stream(BasicStream): :py:class:`StreamError` on failure.""" IOLOG.debug('%r.on_receive()', self) - try: - buf = os.read(self.receive_side.fd, CHUNK_SIZE) - IOLOG.debug('%r.on_receive() -> len %d', self, len(buf)) - except OSError, e: - IOLOG.debug('%r.on_receive() -> OSError: %s', self, e) - # When connected over a TTY (i.e. sudo), disconnection of the - # remote end is signalled by EIO, rather than an empty read like - # sockets or pipes. Ideally this will be replaced later by a - # 'goodbye' message to avoid reading from a disconnected endpoint, - # allowing for more robust error reporting. - if e.errno not in (errno.EIO, errno.ECONNRESET): - raise - LOG.error('%r.on_receive(): %s', self, e) + buf, disconnected = io_op(os.read, self.receive_side.fd, CHUNK_SIZE) + if disconnected: buf = '' self._input_buf += buf @@ -503,7 +508,14 @@ class Stream(BasicStream): def on_transmit(self, broker): """Transmit buffered messages.""" IOLOG.debug('%r.on_transmit()', self) - written = os.write(self.transmit_side.fd, self._output_buf[:CHUNK_SIZE]) + written, disconnected = io_op(os.write, self.transmit_side.fd, + self._output_buf[:CHUNK_SIZE]) + + if disconnected: + LOG.debug('%r.on_transmit(): disconnection detected', self) + self.on_disconnect() + return + IOLOG.debug('%r.on_transmit() -> len %d', self, written) self._output_buf = self._output_buf[written:] if not self._output_buf: diff --git a/econtext/master.py b/econtext/master.py index a4e8769c..90eb502c 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -103,13 +103,8 @@ def iter_read(fd, deadline): bits = [] while True: - try: - s = os.read(fd, 4096) - except OSError, e: - IOLOG.debug('iter_read(%r) -> OSError: %s', fd, e) - # See econtext.core.on_receive() EIO comment. - if e.errno != errno.EIO: - raise + s, disconnected = econtext.core.io_op(os.read, fd, 4096) + if disconnected: s = '' if not s: