From c7ebb39ad4a44bcd13d0886c6eee0623e3482d7b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 10 Mar 2019 21:29:47 +0000 Subject: [PATCH] core: introduce Protocol, DelimitedProtocol and BufferedWriter. They aren't wired in yet as of this commit, and continue duplicating other code. --- mitogen/core.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index 4e7b347b..398b80ed 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1485,6 +1485,143 @@ class LogHandler(logging.Handler): self.local.in_emit = False +class Protocol(object): + """ + Implement the program behaviour associated with activity on a + :class:`Stream`. The protocol in use may vary over a stream's life, for + example to allow :class:`mitogen.parent.BootstrapProtocol` to initialize + the connected child before handing it off to :class:`MitogenProtocol`. A + stream's active protocol is tracked in the :attr:`Stream.protocol` + attribute, and modified via :meth:`Stream.set_protocol`. + + Protocols do not handle IO, they are entirely reliant on the interface + provided by :class:`Stream` and :class:`Side`, allowing the underlying IO + implementation to be replaced without modifying behavioural logic. + """ + stream = None + + @classmethod + def build_stream(cls, *args, **kwargs): + stream = Stream() + stream.set_protocol(cls(*args, **kwargs)) + return stream + + def __repr__(self): + return '%s.%s(%s)' % ( + self.__class__.__module__, + self.__class__.__name__, + self.stream and self.stream.name, + ) + + def on_shutdown(self, broker): + _v and LOG.debug('%r.on_shutdown()', self) + self.stream.on_disconnect(broker) + + def on_disconnect(self, broker): + LOG.debug('%r.on_disconnect()', self) + if self.stream.receive_side: + broker.stop_receive(self.stream) + self.stream.receive_side.close() + if self.stream.transmit_side: + broker._stop_transmit(self.stream) + self.stream.transmit_side.close() + + +class DelimitedProtocol(Protocol): + """ + Provide a :meth:`Protocol.on_receive` implementation for protocols that are + delimited by a fixed string, like text based protocols. Each message is + passed to :meth:`on_line_received` as it arrives, with incomplete messages + passed to :meth:`on_partial_line_received`. + + When emulating user input it is often necessary to respond to incomplete + lines, such as when a "Password: " prompt is sent. + :meth:`on_partial_line_received` may be called repeatedly with an + increasingly complete message. When a complete message is finally received, + :meth:`on_line_received` will be called once for it before the buffer is + discarded. + """ + #: The delimiter. Defaults to newline. + delimiter = b('\n') + _trailer = b('') + + def on_receive(self, broker): + IOLOG.debug('%r.on_receive()', self) + buf = self.stream.receive_side.read() + if not buf: + return self.stream.on_disconnect(broker) + + self._trailer = mitogen.core.iter_split( + buf=self._trailer + buf, + delim=self.delimiter, + func=self.on_line_received, + ) + if self._trailer: + self.on_partial_line_received(self._trailer) + + def on_line_received(self, line): + pass + + def on_partial_line_received(self, line): + pass + + +class BufferedWriter(object): + """ + Implement buffered output while avoiding quadratic string operations. This + is currently constructed by each protocol, in future it may become fixed + for each stream instead. + """ + def __init__(self, broker, protocol): + self._broker = broker + self._protocol = protocol + self._buf = collections.deque() + self._len = 0 + + def write(self, s): + """ + Transmit `s` immediately, falling back to enqueuing it and marking the + stream writeable if no OS buffer space is available. + """ + if not self._len: + # Modifying epoll/Kqueue state is expensive, as are needless broker + # loops. Rather than wait for writeability, just write immediately, + # and fall back to the broker loop on error or full buffer. + try: + n = self._protocol.stream.transmit_side.write(s) + if n: + if n == len(s): + return + s = s[n:] + except OSError: + pass + + self._broker._start_transmit(self._protocol.stream) + self._buf.append(s) + self._len += len(s) + + def on_transmit(self, broker): + """ + Respond to stream writeability by retrying previously buffered + :meth:`write` calls. + """ + if self._buf: + buf = self._buf.popleft() + written = self._protocol.stream.transmit_side.write(buf) + if not written: + _v and LOG.debug('%r.on_transmit(): disconnection detected', self) + self._protocol.on_disconnect(broker) + return + elif written != len(buf): + self._buf.appendleft(BufferType(buf, written)) + + _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) + self._len -= written + + if not self._buf: + broker._stop_transmit(self._protocol.stream) + + class Side(object): """ Represent a single side of a :class:`BasicStream`. This exists to allow