core: introduce Protocol, DelimitedProtocol and BufferedWriter.

They aren't wired in yet as of this commit, and continue duplicating
other code.
pull/607/head
David Wilson 5 years ago
parent d368971749
commit c7ebb39ad4

@ -1485,6 +1485,143 @@ class LogHandler(logging.Handler):
self.local.in_emit = False
class Protocol(object):
"""
Implement the program behaviour associated with activity on a
:class:`Stream`. The protocol in use may vary over a stream's life, for
example to allow :class:`mitogen.parent.BootstrapProtocol` to initialize
the connected child before handing it off to :class:`MitogenProtocol`. A
stream's active protocol is tracked in the :attr:`Stream.protocol`
attribute, and modified via :meth:`Stream.set_protocol`.
Protocols do not handle IO, they are entirely reliant on the interface
provided by :class:`Stream` and :class:`Side`, allowing the underlying IO
implementation to be replaced without modifying behavioural logic.
"""
stream = None
@classmethod
def build_stream(cls, *args, **kwargs):
stream = Stream()
stream.set_protocol(cls(*args, **kwargs))
return stream
def __repr__(self):
return '%s.%s(%s)' % (
self.__class__.__module__,
self.__class__.__name__,
self.stream and self.stream.name,
)
def on_shutdown(self, broker):
_v and LOG.debug('%r.on_shutdown()', self)
self.stream.on_disconnect(broker)
def on_disconnect(self, broker):
LOG.debug('%r.on_disconnect()', self)
if self.stream.receive_side:
broker.stop_receive(self.stream)
self.stream.receive_side.close()
if self.stream.transmit_side:
broker._stop_transmit(self.stream)
self.stream.transmit_side.close()
class DelimitedProtocol(Protocol):
"""
Provide a :meth:`Protocol.on_receive` implementation for protocols that are
delimited by a fixed string, like text based protocols. Each message is
passed to :meth:`on_line_received` as it arrives, with incomplete messages
passed to :meth:`on_partial_line_received`.
When emulating user input it is often necessary to respond to incomplete
lines, such as when a "Password: " prompt is sent.
:meth:`on_partial_line_received` may be called repeatedly with an
increasingly complete message. When a complete message is finally received,
:meth:`on_line_received` will be called once for it before the buffer is
discarded.
"""
#: The delimiter. Defaults to newline.
delimiter = b('\n')
_trailer = b('')
def on_receive(self, broker):
IOLOG.debug('%r.on_receive()', self)
buf = self.stream.receive_side.read()
if not buf:
return self.stream.on_disconnect(broker)
self._trailer = mitogen.core.iter_split(
buf=self._trailer + buf,
delim=self.delimiter,
func=self.on_line_received,
)
if self._trailer:
self.on_partial_line_received(self._trailer)
def on_line_received(self, line):
pass
def on_partial_line_received(self, line):
pass
class BufferedWriter(object):
"""
Implement buffered output while avoiding quadratic string operations. This
is currently constructed by each protocol, in future it may become fixed
for each stream instead.
"""
def __init__(self, broker, protocol):
self._broker = broker
self._protocol = protocol
self._buf = collections.deque()
self._len = 0
def write(self, s):
"""
Transmit `s` immediately, falling back to enqueuing it and marking the
stream writeable if no OS buffer space is available.
"""
if not self._len:
# Modifying epoll/Kqueue state is expensive, as are needless broker
# loops. Rather than wait for writeability, just write immediately,
# and fall back to the broker loop on error or full buffer.
try:
n = self._protocol.stream.transmit_side.write(s)
if n:
if n == len(s):
return
s = s[n:]
except OSError:
pass
self._broker._start_transmit(self._protocol.stream)
self._buf.append(s)
self._len += len(s)
def on_transmit(self, broker):
"""
Respond to stream writeability by retrying previously buffered
:meth:`write` calls.
"""
if self._buf:
buf = self._buf.popleft()
written = self._protocol.stream.transmit_side.write(buf)
if not written:
_v and LOG.debug('%r.on_transmit(): disconnection detected', self)
self._protocol.on_disconnect(broker)
return
elif written != len(buf):
self._buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._len -= written
if not self._buf:
broker._stop_transmit(self._protocol.stream)
class Side(object):
"""
Represent a single side of a :class:`BasicStream`. This exists to allow

Loading…
Cancel
Save