core: add Stream.pending_bytes() accessor.

pull/209/head
David Wilson 7 years ago
parent e04f4f7e9d
commit 4c5e13bf87

@ -204,6 +204,25 @@ Stream Classes
.. autoclass:: Stream
:members:
.. method:: pending_bytes ()
Returns the number of bytes queued for transmission on this stream.
This can be used to limit the amount of data buffered in RAM by an
otherwise unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, using a wrapper like:
::
def get_pending_bytes(self, stream):
latch = mitogen.core.Latch()
self.broker.defer(
lambda: latch.put(stream.pending_bytes())
)
return latch.get()
.. currentmodule:: mitogen.fork
.. autoclass:: Stream

@ -795,8 +795,9 @@ class Stream(BasicStream):
self.sent_modules = set()
self.construct(**kwargs)
self._input_buf = collections.deque()
self._input_buf_len = 0
self._output_buf = collections.deque()
self._input_buf_len = 0
self._output_buf_len = 0
def construct(self):
pass
@ -866,6 +867,9 @@ class Stream(BasicStream):
self._router._async_route(msg, self)
return True
def pending_bytes(self):
return self._output_buf_len
def on_transmit(self, broker):
"""Transmit buffered messages."""
_vv and IOLOG.debug('%r.on_transmit()', self)
@ -881,6 +885,7 @@ class Stream(BasicStream):
self._output_buf.appendleft(buffer(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
if not self._output_buf:
broker._stop_transmit(self)
@ -890,10 +895,10 @@ class Stream(BasicStream):
pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id,
msg.auth_id, msg.handle, msg.reply_to or 0,
len(msg.data)) + msg.data
was_transmitting = len(self._output_buf)
self._output_buf.append(pkt)
if not was_transmitting:
if not self._output_buf_len:
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
self._output_buf_len += len(pkt)
def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May

Loading…
Cancel
Save