|
|
|
@ -385,6 +385,18 @@ class Side(object):
|
|
|
|
|
os.close(self.fd)
|
|
|
|
|
self.fd = None
|
|
|
|
|
|
|
|
|
|
def read(self, n=CHUNK_SIZE):
|
|
|
|
|
s, disconnected = io_op(os.read, self.fd, n)
|
|
|
|
|
if disconnected:
|
|
|
|
|
return ''
|
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
def write(self, s):
|
|
|
|
|
written, disconnected = io_op(os.write, self.fd, s[:CHUNK_SIZE])
|
|
|
|
|
if disconnected:
|
|
|
|
|
return None
|
|
|
|
|
return written
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BasicStream(object):
|
|
|
|
|
"""
|
|
|
|
@ -467,8 +479,8 @@ class Stream(BasicStream):
|
|
|
|
|
:py:class:`StreamError` on failure."""
|
|
|
|
|
IOLOG.debug('%r.on_receive()', self)
|
|
|
|
|
|
|
|
|
|
buf, disconnected = io_op(os.read, self.receive_side.fd, CHUNK_SIZE)
|
|
|
|
|
if disconnected:
|
|
|
|
|
buf = self.receive_side.read()
|
|
|
|
|
if buf is None:
|
|
|
|
|
buf = ''
|
|
|
|
|
|
|
|
|
|
self._input_buf += buf
|
|
|
|
@ -516,10 +528,8 @@ class Stream(BasicStream):
|
|
|
|
|
def on_transmit(self, broker):
|
|
|
|
|
"""Transmit buffered messages."""
|
|
|
|
|
IOLOG.debug('%r.on_transmit()', self)
|
|
|
|
|
written, disconnected = io_op(os.write, self.transmit_side.fd,
|
|
|
|
|
self._output_buf[:CHUNK_SIZE])
|
|
|
|
|
|
|
|
|
|
if disconnected:
|
|
|
|
|
written = self.transmit_side.write(self._output_buf)
|
|
|
|
|
if written is None:
|
|
|
|
|
LOG.debug('%r.on_transmit(): disconnection detected', self)
|
|
|
|
|
self.on_disconnect()
|
|
|
|
|
return
|
|
|
|
|