diff --git a/docs/internals.rst b/docs/internals.rst index b6c3d069..d22053c8 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -204,6 +204,25 @@ Stream Classes .. autoclass:: Stream :members: + .. method:: pending_bytes () + + Returns the number of bytes queued for transmission on this stream. + This can be used to limit the amount of data buffered in RAM by an + otherwise unlimited consumer. + + For an accurate result, this method should be called from the Broker + thread, using a wrapper like: + + :: + + def get_pending_bytes(self, stream): + latch = mitogen.core.Latch() + self.broker.defer( + lambda: latch.put(stream.pending_bytes()) + ) + return latch.get() + + .. currentmodule:: mitogen.fork .. autoclass:: Stream diff --git a/mitogen/core.py b/mitogen/core.py index dbac6f0d..c57f84cb 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -795,8 +795,9 @@ class Stream(BasicStream): self.sent_modules = set() self.construct(**kwargs) self._input_buf = collections.deque() - self._input_buf_len = 0 self._output_buf = collections.deque() + self._input_buf_len = 0 + self._output_buf_len = 0 def construct(self): pass @@ -866,6 +867,9 @@ class Stream(BasicStream): self._router._async_route(msg, self) return True + def pending_bytes(self): + return self._output_buf_len + def on_transmit(self, broker): """Transmit buffered messages.""" _vv and IOLOG.debug('%r.on_transmit()', self) @@ -881,6 +885,7 @@ class Stream(BasicStream): self._output_buf.appendleft(buffer(buf, written)) _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) + self._output_buf_len -= written if not self._output_buf: broker._stop_transmit(self) @@ -890,10 +895,10 @@ class Stream(BasicStream): pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, msg.auth_id, msg.handle, msg.reply_to or 0, len(msg.data)) + msg.data - was_transmitting = len(self._output_buf) - self._output_buf.append(pkt) - if not was_transmitting: + if not self._output_buf_len: self._router.broker._start_transmit(self) + self._output_buf.append(pkt) + self._output_buf_len += len(pkt) def send(self, msg): """Send `data` to `handle`, and tell the broker we have output. May