From 8676c40674fbbb4372f89b0b9804af0a4140a2a0 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 29 Mar 2018 15:08:17 +0545 Subject: [PATCH] core: make _start_transmit / _stop_transmit async-only For now at least, these APIs are always used in an asynchronous context, so stop using the defer mechanism. --- docs/api.rst | 6 +++--- docs/images/layout.graphml | 4 ++-- docs/internals.rst | 4 ++-- mitogen/core.py | 18 +++++++++--------- mitogen/fakessh.py | 6 +++--- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 1364440d..e7846f9b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1062,11 +1062,11 @@ Broker Class Mark the :py:attr:`receive_side ` on `stream` as not ready for reading. Safe to call from any thread. - .. method:: start_transmit (stream) + .. method:: _start_transmit (stream) Mark the :py:attr:`transmit_side ` on `stream` as - ready for writing. Safe to call from any thread. When the associated - file descriptor becomes ready for writing, + ready for writing. Must only be called from the Broker thread. When the + associated file descriptor becomes ready for writing, :py:meth:`BasicStream.on_transmit` will be called. .. method:: stop_receive (stream) diff --git a/docs/images/layout.graphml b/docs/images/layout.graphml index 4aa1f95e..f21842bb 100644 --- a/docs/images/layout.graphml +++ b/docs/images/layout.graphml @@ -122,8 +122,8 @@ send(msg) - start_transmit(strm) -stop_transmit(strm) + _start_transmit(strm) +_stop_transmit(strm) diff --git a/docs/internals.rst b/docs/internals.rst index 7799011c..625f14ce 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -188,12 +188,12 @@ Stream Classes .. method:: on_transmit (broker) Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` - has been marked writeable using :py:meth:`Broker.start_transmit` and + has been marked writeable using :py:meth:`Broker._start_transmit` and the broker has detected the associated file descriptor is ready for writing. Subclasses must implement this method if - :py:meth:`Broker.start_transmit` is ever called on them. + :py:meth:`Broker._start_transmit` is ever called on them. .. method:: on_shutdown (broker) diff --git a/mitogen/core.py b/mitogen/core.py index 9bc0a2c8..a37d5f19 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -723,7 +723,7 @@ class BasicStream(object): def on_disconnect(self, broker): LOG.debug('%r.on_disconnect()', self) broker.stop_receive(self) - broker.stop_transmit(self) + broker._stop_transmit(self) if self.receive_side: self.receive_side.close() if self.transmit_side: @@ -834,7 +834,7 @@ class Stream(BasicStream): _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) if not self._output_buf: - broker.stop_transmit(self) + broker._stop_transmit(self) def _send(self, msg): _vv and IOLOG.debug('%r._send(%r)', self, msg) @@ -842,7 +842,7 @@ class Stream(BasicStream): msg.auth_id, msg.handle, msg.reply_to or 0, len(msg.data)) + msg.data self._output_buf.append(pkt) - self._router.broker.start_transmit(self) + self._router.broker._start_transmit(self) def send(self, msg): """Send `data` to `handle`, and tell the broker we have output. May @@ -1317,14 +1317,14 @@ class Broker(object): IOLOG.debug('%r.stop_receive(%r)', self, stream) self.defer(self._list_discard, self._readers, stream.receive_side) - def start_transmit(self, stream): - IOLOG.debug('%r.start_transmit(%r)', self, stream) + def _start_transmit(self, stream): + IOLOG.debug('%r._start_transmit(%r)', self, stream) assert stream.transmit_side and stream.transmit_side.fd is not None - self.defer(self._list_add, self._writers, stream.transmit_side) + self._list_add(self._writers, stream.transmit_side) - def stop_transmit(self, stream): - IOLOG.debug('%r.stop_transmit(%r)', self, stream) - self.defer(self._list_discard, self._writers, stream.transmit_side) + def _stop_transmit(self, stream): + IOLOG.debug('%r._stop_transmit(%r)', self, stream) + self._list_discard(self._writers, stream.transmit_side) def _call(self, stream, func): try: diff --git a/mitogen/fakessh.py b/mitogen/fakessh.py index 13abfcfe..f5dcbe1c 100644 --- a/mitogen/fakessh.py +++ b/mitogen/fakessh.py @@ -62,14 +62,14 @@ class IoPump(mitogen.core.BasicStream): def write(self, s): self._output_buf += s - self._broker.start_transmit(self) + self._broker._start_transmit(self) def close(self): self._closed = True # If local process hasn't exitted yet, ensure its write buffer is # drained before lazily triggering disconnect in on_transmit. if self.transmit_side.fd is not None: - self._broker.start_transmit(self) + self._broker._start_transmit(self) def on_shutdown(self, broker): self.close() @@ -83,7 +83,7 @@ class IoPump(mitogen.core.BasicStream): self._output_buf = self._output_buf[written:] if not self._output_buf: - broker.stop_transmit(self) + broker._stop_transmit(self) if self._closed: self.on_disconnect(broker)