Refactor Stream, introduce quasi-asynchronous connect, much more

Split Stream into many, many classes

  * mitogen.parent.Connection: Handles connection setup logic only.
    * Maintain references to stdout and stderr streams.
    * Manages TimerList timer to cancel connection attempt after
      deadline
    * Blocking setup code replaced by async equivalents running on the
      broker

  * mitogen.parent.Options: Tracks connection-specific options. This
    keeps the connection class small, but more importantly, it is
    generic to the future desire to build and execute command lines
    without starting a full connection.

  * mitogen.core.Protocol: Handles program behaviour relating to events
    on a stream. Protocol performs no IO of its own, instead deferring
    it to Stream and Side. This makes testing much easier, and means
    libssh can reimplement Stream and Side to reuse MitogenProtocol

  * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream
    implementtion

  * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered
    transmit implementation, made generic

  * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how
    to split up input and pass it on to a
    on_line_received()/on_partial_line_received() callback.

  * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old
    blocking connect code. Waits for various prompts (MITO001 etc) and
    writes the bootstrap using a BufferedWriter. On success, switches
    the stream to MitogenProtocol.

  * mitogen.core.Message: move encoding parts of MitogenProtocol out to
    Message (where it belongs) and write a bunch of new tests for
    pickling.

  * The bizarre Stream.construct() is gone now, Option.__init__ is its
    own constructor. Should fix many LGTM errors.

* Update all connection methods:  Every connection method is updated to
  use async logic, defining protocols as required to handle interactive
  prompts like in SSH or su. Add new real integration tests for at least
  doas and su.

* Eliminate manual fd management: File descriptors are trapped in file
  objects at their point of origin, and Side is updated to use file
  objects rather than raw descriptors. This eliminates a whole class of
  bugs where unrelated FDs could be closed by the wrong component. Now
  an FD's open/closed status is fused to it everywhere in the library.

* Halve file descriptor usage: now FD open/close state is tracked by
  its file object, we don't need to duplicate FDs everywhere so that
  receive/transmit side can be closed independently. Instead both sides
  back on to the same file object. Closes #26, Closes #470.

* Remove most uses of dup/dup2: Closes #256. File descriptors are
  trapped in a common file object and shared among classes. The
  remaining few uses for dup/dup2 are as close to minimal as possible.

* Introduce mitogen.parent.Process: uniform interface for subprocesses
  created either via mitogen.fork or the subprocess module. Remove all
  the crap where we steal a pid from subprocess guts. Now we use
  subprocess to manage its processes as it should be. Closes #169 by
  using the new Timers facility to poll for a slow-to-exit subprocess.

* Fix su password race: Closes #363. DelineatedProtocol naturally
  retries partially received lines, preventing the cause of the original
  race.

* Delete old blocking IO utility functions
  iter_read()/write_all()/discard_until().

Closes #26
Closes #147
Closes #169
Closes #256
Closes #363
Closes #419
Closes #470
pull/607/head
David Wilson 6 years ago
parent 37beb3a5c5
commit 8d1b01d8ef

@ -1374,10 +1374,12 @@ class Importer(object):
if not present:
funcs = self._callbacks.get(fullname)
if funcs is not None:
_v and LOG.debug('_request_module(%r): in flight', fullname)
_v and LOG.debug('%s: existing request for %s in flight',
self, fullname)
funcs.append(callback)
else:
_v and LOG.debug('_request_module(%r): new request', fullname)
_v and LOG.debug('%s: requesting %s from parent',
self, fullname)
self._callbacks[fullname] = [callback]
self._context.send(
Message(data=b(fullname), handle=GET_MODULE)
@ -1493,6 +1495,80 @@ class LogHandler(logging.Handler):
self.local.in_emit = False
class Stream(object):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
#: A :class:`Protocol` representing the protocol active on the stream.
protocol = None
#: In parents, the :class:`mitogen.parent.Connection` instance.
conn = None
name = u'default'
def set_protocol(self, protocol):
self.protocol = protocol
self.protocol.stream = self
def accept(self, rfp, wfp):
self.receive_side = Side(self, rfp)
self.transmit_side = Side(self, wfp)
def __repr__(self):
return "<Stream %s>" % (self.name,)
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
buf = self.receive_side.read(self.protocol.read_size)
if not buf:
LOG.debug('%r: empty read, disconnecting', self)
return self.on_disconnect(broker)
self.protocol.on_receive(broker, buf)
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
self.protocol.on_transmit(broker)
def on_shutdown(self, broker):
"""
Called by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:meth:`on_disconnect`.
"""
fire(self, 'shutdown')
self.protocol.on_shutdown(broker)
def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
fire(self, 'disconnect')
self.protocol.on_disconnect(broker)
class Protocol(object):
"""
Implement the program behaviour associated with activity on a
@ -1506,11 +1582,13 @@ class Protocol(object):
provided by :class:`Stream` and :class:`Side`, allowing the underlying IO
implementation to be replaced without modifying behavioural logic.
"""
stream_class = Stream
stream = None
read_size = CHUNK_SIZE
@classmethod
def build_stream(cls, *args, **kwargs):
stream = Stream()
stream = cls.stream_class()
stream.set_protocol(cls(*args, **kwargs))
return stream
@ -1547,24 +1625,30 @@ class DelimitedProtocol(Protocol):
increasingly complete message. When a complete message is finally received,
:meth:`on_line_received` will be called once for it before the buffer is
discarded.
If :func:`on_line_received` returns :data:`False`, remaining data is passed
unprocessed to the stream's current protocol's :meth:`on_receive`. This
allows switching from line-oriented to binary while the input buffer
contains both kinds of data.
"""
#: The delimiter. Defaults to newline.
delimiter = b('\n')
_trailer = b('')
def on_receive(self, broker):
def on_receive(self, broker, buf):
IOLOG.debug('%r.on_receive()', self)
buf = self.stream.receive_side.read()
if not buf:
return self.stream.on_disconnect(broker)
self._trailer = mitogen.core.iter_split(
self._trailer, cont = mitogen.core.iter_split(
buf=self._trailer + buf,
delim=self.delimiter,
func=self.on_line_received,
)
if self._trailer:
if cont:
self.on_partial_line_received(self._trailer)
else:
assert self.stream.protocol is not self
self.stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line):
pass
@ -1656,23 +1740,24 @@ class Side(object):
enabled using :func:`fcntl.fcntl`.
"""
_fork_refs = weakref.WeakValueDictionary()
closed = False
def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False):
def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
#: The :class:`Stream` for which this is a read or write side.
self.stream = stream
#: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called.
self.fd = fd
self.closed = False
self.fp = fp
self.fd = fp.fileno()
#: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the
#: side is disconnected.
self.keep_alive = keep_alive
self._fork_refs[id(self)] = self
if cloexec:
set_cloexec(fd)
set_cloexec(self.fd)
if not blocking:
set_nonblock(fd)
set_nonblock(self.fd)
def __repr__(self):
return '<Side of %r fd %s>' % (self.stream, self.fd)
@ -1692,7 +1777,7 @@ class Side(object):
_vv and IOLOG.debug('%r.close()', self)
if not self.closed:
self.closed = True
os.close(self.fd)
self.fp.close()
def read(self, n=CHUNK_SIZE):
"""
@ -1728,9 +1813,8 @@ class Side(object):
Number of bytes written, or :data:`None` if disconnection was
detected.
"""
if self.closed or self.fd is None:
# Refuse to touch the handle after closed, it may have been reused
# by another thread.
if self.closed:
# Don't touch the handle after close, it may be reused elsewhere.
return None
written, disconnected = io_op(os.write, self.fd, s)
@ -1740,67 +1824,10 @@ class Side(object):
return written
class BasicStream(object):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
pass
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
pass
def on_shutdown(self, broker):
class MitogenProtocol(Protocol):
"""
Called by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:meth:`on_disconnect`.
"""
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
LOG.debug('%r.on_disconnect()', self)
if self.receive_side:
broker.stop_receive(self)
self.receive_side.close()
if self.transmit_side:
broker._stop_transmit(self)
self.transmit_side.close()
fire(self, 'disconnect')
class Stream(BasicStream):
"""
:class:`BasicStream` subclass implementing mitogen's :ref:`stream
protocol <stream-protocol>`.
:class:`Protocol` implementing mitogen's :ref:`stream protocol
<stream-protocol>`.
"""
#: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream.
@ -1811,24 +1838,24 @@ class Stream(BasicStream):
#: :data:`mitogen.parent_ids`.
is_privileged = False
def __init__(self, router, remote_id, **kwargs):
def __init__(self, router, remote_id):
self._router = router
self.remote_id = remote_id
self.name = u'default'
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs)
self._input_buf = collections.deque()
self._output_buf = collections.deque()
self._input_buf_len = 0
self._output_buf_len = 0
self._writer = BufferedWriter(router.broker, self)
#: Routing records the dst_id of every message arriving from this
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
self.egress_ids = set()
def construct(self):
pass
def _internal_receive(self, broker, buf):
def on_receive(self, broker, buf):
"""
Handle the next complete message on the stream. Raise
:class:`StreamError` on failure.
"""
_vv and IOLOG.debug('%r.on_receive()', self)
if self._input_buf and self._input_buf_len < 128:
self._input_buf[0] += buf
else:
@ -1838,60 +1865,45 @@ class Stream(BasicStream):
while self._receive_one(broker):
pass
def on_receive(self, broker):
"""Handle the next complete message on the stream. Raise
:class:`StreamError` on failure."""
_vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
self._internal_receive(broker, buf)
HEADER_FMT = '>hLLLLLL'
HEADER_LEN = struct.calcsize(HEADER_FMT)
HEADER_MAGIC = 0x4d49 # 'MI'
corrupt_msg = (
'Corruption detected: frame signature incorrect. This likely means '
'%s: Corruption detected: frame signature incorrect. This likely means'
' some external process is interfering with the connection. Received:'
'\n\n'
'%r'
)
def _receive_one(self, broker):
if self._input_buf_len < self.HEADER_LEN:
if self._input_buf_len < Message.HEADER_LEN:
return False
msg = Message()
msg.router = self._router
(magic, msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to, msg_len) = struct.unpack(
self.HEADER_FMT,
self._input_buf[0][:self.HEADER_LEN],
Message.HEADER_FMT,
self._input_buf[0][:Message.HEADER_LEN],
)
if magic != self.HEADER_MAGIC:
LOG.error(self.corrupt_msg, self._input_buf[0][:2048])
self.on_disconnect(broker)
if magic != Message.HEADER_MAGIC:
LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
self.stream.on_disconnect(broker)
return False
if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size)
self.on_disconnect(broker)
self.stream.on_disconnect(broker)
return False
total_len = msg_len + self.HEADER_LEN
total_len = msg_len + Message.HEADER_LEN
if self._input_buf_len < total_len:
_vv and IOLOG.debug(
'%r: Input too short (want %d, got %d)',
self, msg_len, self._input_buf_len - self.HEADER_LEN
self, msg_len, self._input_buf_len - Message.HEADER_LEN
)
return False
start = self.HEADER_LEN
start = Message.HEADER_LEN
prev_start = start
remain = total_len
bits = []
@ -1906,7 +1918,7 @@ class Stream(BasicStream):
msg.data = b('').join(bits)
self._input_buf.appendleft(buf[prev_start+len(bit):])
self._input_buf_len -= total_len
self._router._async_route(msg, self)
self._router._async_route(msg, self.stream)
return True
def pending_bytes(self):
@ -1918,50 +1930,16 @@ class Stream(BasicStream):
For an accurate result, this method should be called from the Broker
thread, for example by using :meth:`Broker.defer_sync`.
"""
return self._output_buf_len
return self._writer._len
def on_transmit(self, broker):
"""Transmit buffered messages."""
_vv and IOLOG.debug('%r.on_transmit()', self)
if self._output_buf:
buf = self._output_buf.popleft()
written = self.transmit_side.write(buf)
if not written:
_v and LOG.debug('%r.on_transmit(): disconnection detected', self)
self.on_disconnect(broker)
return
elif written != len(buf):
self._output_buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
if not self._output_buf:
broker._stop_transmit(self)
self._writer.on_transmit(broker)
def _send(self, msg):
_vv and IOLOG.debug('%r._send(%r)', self, msg)
pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id,
msg.src_id, msg.auth_id, msg.handle,
msg.reply_to or 0, len(msg.data)) + msg.data
if not self._output_buf_len:
# Modifying epoll/Kqueue state is expensive, as are needless broker
# loops. Rather than wait for writeability, just write immediately,
# and fall back to the broker loop on error or full buffer.
try:
n = self.transmit_side.write(pkt)
if n:
if n == len(pkt):
return
pkt = pkt[n:]
except OSError:
pass
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
self._output_buf_len += len(pkt)
self._writer.write(msg.pack())
def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May
@ -1969,17 +1947,8 @@ class Stream(BasicStream):
self._router.broker.defer(self._send, msg)
def on_shutdown(self, broker):
"""Override BasicStream behaviour of immediately disconnecting."""
_v and LOG.debug('%r.on_shutdown(%r)', self, broker)
def accept(self, rfd, wfd):
# TODO: what is this os.dup for?
self.receive_side = Side(self, os.dup(rfd))
self.transmit_side = Side(self, os.dup(wfd))
def __repr__(self):
cls = type(self)
return "%s.%s('%s')" % (cls.__module__, cls.__name__, self.name)
"""Disable :class:`Protocol` immediate disconnect behaviour."""
_v and LOG.debug('%r: shutting down', self)
class Context(object):
@ -2005,18 +1974,17 @@ class Context(object):
:param str name:
Context name.
"""
name = None
remote_name = None
def __init__(self, router, context_id, name=None):
self.router = router
self.context_id = context_id
self.name = name
if name:
self.name = to_text(name)
def __reduce__(self):
name = self.name
if name and not isinstance(name, UnicodeType):
name = UnicodeType(name, 'utf-8')
return _unpickle_context, (self.context_id, name)
return _unpickle_context, (self.context_id, self.name)
def on_disconnect(self):
_v and LOG.debug('%r: disconnecting', self)
@ -2161,7 +2129,7 @@ class Poller(object):
self._wfds = {}
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
return '%s' % (type(self).__name__,)
def _update(self, fd):
"""
@ -2509,7 +2477,7 @@ class Latch(object):
)
class Waker(BasicStream):
class Waker(Protocol):
"""
:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state
@ -2517,17 +2485,20 @@ class Waker(BasicStream):
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
"""
read_size = 1
broker_ident = None
@classmethod
def build_stream(cls, broker):
stream = super(Waker, cls).build_stream(broker)
stream.accept(*pipe())
return stream
def __init__(self, broker):
self._broker = broker
self._lock = threading.Lock()
self._deferred = []
rfd, wfd = os.pipe()
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def __repr__(self):
return 'Waker(fd=%r/%r)' % (
self.stream.receive_side and self.stream.receive_side.fd,
@ -2545,7 +2516,7 @@ class Waker(BasicStream):
finally:
self._lock.release()
def on_receive(self, broker):
def on_receive(self, broker, buf):
"""
Drain the pipe and fire callbacks. Since :attr:`_deferred` is
synchronized, :meth:`defer` and :meth:`on_receive` can conspire to
@ -2554,7 +2525,6 @@ class Waker(BasicStream):
_vv and IOLOG.debug('%r.on_receive()', self)
self._lock.acquire()
try:
self.receive_side.read(1)
deferred = self._deferred
self._deferred = []
finally:
@ -2566,7 +2536,7 @@ class Waker(BasicStream):
except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs)
self._broker.shutdown()
broker.shutdown()
def _wake(self):
"""
@ -2574,7 +2544,7 @@ class Waker(BasicStream):
teardown, the FD may already be closed, so ignore EBADF.
"""
try:
self.transmit_side.write(b(' '))
self.stream.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EBADF:
@ -2601,7 +2571,8 @@ class Waker(BasicStream):
if self._broker._exitted:
raise Error(self.broker_shutdown_msg)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self,
self.stream.transmit_side.fd)
self._lock.acquire()
try:
if not self._deferred:
@ -2611,53 +2582,45 @@ class Waker(BasicStream):
self._lock.release()
class IoLogger(BasicStream):
class IoLoggerProtocol(DelimitedProtocol):
"""
:class:`BasicStream` subclass that sets up redirection of a standard
UNIX file descriptor back into the Python :mod:`logging` package.
Handle redirection of standard IO into the :mod:`logging` package.
"""
_trailer = u''
def __init__(self, broker, name, dest_fd):
self._broker = broker
self._name = name
self._rsock, self._wsock = socket.socketpair()
os.dup2(self._wsock.fileno(), dest_fd)
set_cloexec(self._wsock.fileno())
@classmethod
def build_stream(cls, name, dest_fd):
"""
Even though the descriptor `dest_fd` will hold the opposite end of the
socket open, we must keep a separate dup() of it (i.e. wsock) in case
some code decides to overwrite `dest_fd` later, which would thus break
:meth:`on_shutdown`.
"""
rsock, wsock = socket.socketpair()
os.dup2(wsock.fileno(), dest_fd)
stream = super(IoLoggerProtocol, cls).build_stream(name)
stream.name = name
stream.accept(rsock, wsock)
return stream
def __init__(self, name):
self._log = logging.getLogger(name)
# #453: prevent accidental log initialization in a child creating a
# feedback loop.
self._log.propagate = False
self._log.handlers = logging.getLogger().handlers[:]
self.receive_side = Side(self, self._rsock.fileno())
self.transmit_side = Side(self, dest_fd, cloexec=False, blocking=True)
self._broker.start_receive(self)
def __repr__(self):
return '<IoLogger %s>' % (self._name,)
def on_shutdown(self, broker):
"""Shut down the write end of the logging socket."""
_v and LOG.debug('%r: shutting down', self)
if not IS_WSL:
# #333: WSL generates invalid readiness indication on shutdown()
self._wsock.shutdown(socket.SHUT_WR)
self._wsock.close()
self.transmit_side.close()
def on_receive(self, broker):
_vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
# #333: WSL generates invalid readiness indication on shutdown().
# This modifies the *kernel object* inherited by children, causing
# EPIPE on subsequent writes to any dupped FD in any process. The
# read side can then drain completely of prior buffered data.
self.stream.transmit_side.fp.shutdown(socket.SHUT_WR)
self.stream.transmit_side.close()
self._trailer = iter_split(
buf=self._trailer + buf.decode('latin1'),
delim='\n',
func=lambda s: self._log.info('%s', s)
)
def on_line_received(self, line):
self._log.info('%s', line.decode('utf-8', 'replace'))
class Router(object):
@ -3008,11 +2971,11 @@ class Router(object):
self, msg.src_id, in_stream, expect, msg)
return
if in_stream.auth_id is not None:
msg.auth_id = in_stream.auth_id
if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.protocol.auth_id
# Maintain a set of IDs the source ever communicated with.
in_stream.egress_ids.add(msg.dst_id)
in_stream.protocol.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream)
@ -3027,12 +2990,13 @@ class Router(object):
return
if in_stream and self.unidirectional and not \
(in_stream.is_privileged or out_stream.is_privileged):
(in_stream.protocol.is_privileged or
out_stream.protocol.is_privileged):
self._maybe_send_dead(msg, self.unidirectional_msg,
in_stream.remote_id, out_stream.remote_id)
in_stream.protocol.remote_id, out_stream.protocol.remote_id)
return
out_stream._send(msg)
out_stream.protocol._send(msg)
def route(self, msg):
"""
@ -3074,11 +3038,11 @@ class Broker(object):
def __init__(self, poller_class=None, activate_compat=True):
self._alive = True
self._exitted = False
self._waker = Waker(self)
self._waker = Waker.build_stream(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread.
#: Safe to call from any thread.
self.defer = self._waker.defer
self.defer = self._waker.protocol.defer
self.poller = self.poller_class()
self.poller.start_receive(
self._waker.receive_side.fd,
@ -3112,7 +3076,7 @@ class Broker(object):
"""
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side
assert side and side.fd is not None
assert side and not side.closed
self.defer(self.poller.start_receive,
side.fd, (side, stream.on_receive))
@ -3133,7 +3097,7 @@ class Broker(object):
"""
_vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
side = stream.transmit_side
assert side and side.fd is not None
assert side and not side.closed
self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream):
@ -3246,7 +3210,7 @@ class Broker(object):
:meth:`shutdown` is called.
"""
# For Python 2.4, no way to retrieve ident except on thread.
self._waker.broker_ident = thread.get_ident()
self._waker.protocol.broker_ident = thread.get_ident()
try:
while self._alive:
self._loop_once()
@ -3486,18 +3450,16 @@ class ExternalContext(object):
else:
self.parent = Context(self.router, parent_id, 'parent')
in_fd = self.config.get('in_fd', 100)
out_fd = self.config.get('out_fd', 1)
self.stream = Stream(self.router, parent_id)
in_fp = os.fdopen(os.dup(self.config.get('in_fd', 100)), 'rb', 0)
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
self.stream = MitogenProtocol.build_stream(self.router, parent_id)
self.stream.accept(in_fp, out_fp)
self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd)
self.stream.receive_side.keep_alive = False
listen(self.stream, 'disconnect', self._on_parent_disconnect)
listen(self.broker, 'exit', self._on_broker_exit)
os.close(in_fd)
def _reap_first_stage(self):
try:
os.wait() # Reap first stage.
@ -3584,7 +3546,7 @@ class ExternalContext(object):
try:
if os.isatty(2):
self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0)
set_cloexec(self.reserve_tty_fp)
set_cloexec(self.reserve_tty_fp.fileno())
except OSError:
pass
@ -3600,8 +3562,12 @@ class ExternalContext(object):
sys.stdout.close()
self._nullify_stdio()
self.stdout_log = IoLogger(self.broker, 'stdout', 1)
self.stderr_log = IoLogger(self.broker, 'stderr', 2)
self.loggers = []
for name, fd in (('stdout', 1), ('stderr', 2)):
log = IoLoggerProtocol.build_stream(name, fd)
self.broker.start_receive(log)
self.loggers.append(log)
# Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1)
@ -3621,11 +3587,11 @@ class ExternalContext(object):
self.dispatcher = Dispatcher(self)
self.router.register(self.parent, self.stream)
self.router._setup_logging()
self.log_handler.uncork()
sys.executable = os.environ.pop('ARGV0', sys.executable)
_v and LOG.debug('Connected to context %s; my ID is %r',
self.parent, mitogen.context_id)
_v and LOG.debug('Parent is context %r (%s); my ID is %r',
self.parent.context_id, self.parent.name,
mitogen.context_id)
_v and LOG.debug('pid:%r ppid:%r uid:%r/%r, gid:%r/%r host:%r',
os.getpid(), os.getppid(), os.geteuid(),
os.getuid(), os.getegid(), os.getgid(),
@ -3633,6 +3599,9 @@ class ExternalContext(object):
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)
self.broker._py24_25_compat()
if self.config.get('send_ec2', True):
self.stream.transmit_side.write(b('MITO002\n'))
self.log_handler.uncork()
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:

@ -29,6 +29,7 @@
# !mitogen: minify_safe
import logging
import re
import mitogen.core
import mitogen.parent
@ -37,77 +38,106 @@ from mitogen.core import b
LOG = logging.getLogger(__name__)
password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required'
class PasswordError(mitogen.core.StreamError):
pass
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
username = 'root'
class Options(mitogen.parent.Options):
username = u'root'
password = None
doas_path = 'doas'
password_prompt = b('Password:')
password_prompt = u'Password:'
incorrect_prompts = (
b('doas: authentication failed'),
u'doas: authentication failed', # slicer69/doas
u'doas: Authorization failed', # openbsd/src
)
def construct(self, username=None, password=None, doas_path=None,
def __init__(self, username=None, password=None, doas_path=None,
password_prompt=None, incorrect_prompts=None, **kwargs):
super(Stream, self).construct(**kwargs)
super(Options, self).__init__(**kwargs)
if username is not None:
self.username = username
self.username = mitogen.core.to_text(username)
if password is not None:
self.password = password
self.password = mitogen.core.to_text(password)
if doas_path is not None:
self.doas_path = doas_path
if password_prompt is not None:
self.password_prompt = password_prompt.lower()
self.password_prompt = mitogen.core.to_text(password_prompt)
if incorrect_prompts is not None:
self.incorrect_prompts = map(str.lower, incorrect_prompts)
self.incorrect_prompts = [
mitogen.core.to_text(p)
for p in incorrect_prompts
]
def _get_name(self):
return u'doas.' + mitogen.core.to_text(self.username)
def get_boot_command(self):
bits = [self.doas_path, '-u', self.username, '--']
bits = bits + super(Stream, self).get_boot_command()
LOG.debug('doas command line: %r', bits)
return bits
class BootstrapProtocol(mitogen.parent.RegexProtocol):
password_sent = False
password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required'
def setup_patterns(self, conn):
prompt_pattern = re.compile(
re.escape(conn.options.password_prompt).encode('utf-8'),
re.I
)
incorrect_prompt_pattern = re.compile(
u'|'.join(
re.escape(s)
for s in conn.options.incorrect_prompts
).encode('utf-8'),
re.I
)
def _connect_input_loop(self, it):
password_sent = False
for buf in it:
LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
self.PATTERNS = [
(incorrect_prompt_pattern, type(self)._on_incorrect_password),
]
self.PARTIAL_PATTERNS = [
(prompt_pattern, type(self)._on_password_prompt),
]
def _on_incorrect_password(self, line, match):
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
def _on_password_prompt(self, line, match):
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return
if any(s in buf.lower() for s in self.incorrect_prompts):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
elif self.password_prompt in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.diag_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
return
def _connect_bootstrap(self):
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, self.diag_stream.receive_side.fd],
deadline=self.connect_deadline,
LOG.debug('sending password')
self.stream.transmit_side.write(
(self.stream.conn.options.password + '\n').encode('utf-8')
)
try:
self._connect_input_loop(it)
finally:
it.close()
self.password_sent = True
class Connection(mitogen.parent.Connection):
options_class = Options
diag_protocol_class = BootstrapProtocol
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
def _get_name(self):
return u'doas.' + self.options.username
def diag_stream_factory(self):
stream = super(Connection, self).diag_stream_factory()
stream.protocol.setup_patterns(self)
return stream
def get_boot_command(self):
bits = [self.options.doas_path, '-u', self.options.username, '--']
return bits + super(Connection, self).get_boot_command()

@ -37,45 +37,47 @@ import mitogen.parent
LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
class Options(mitogen.parent.Options):
container = None
image = None
username = None
docker_path = 'docker'
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
docker_path = u'docker'
def construct(self, container=None, image=None,
docker_path=None, username=None,
**kwargs):
def __init__(self, container=None, image=None, docker_path=None,
username=None, **kwargs):
super(Options, self).__init__(**kwargs)
assert container or image
super(Stream, self).construct(**kwargs)
if container:
self.container = container
self.container = mitogen.core.to_text(container)
if image:
self.image = image
self.image = mitogen.core.to_text(image)
if docker_path:
self.docker_path = docker_path
self.docker_path = mitogen.core.to_text(docker_path)
if username:
self.username = username
self.username = mitogen.core.to_text(username)
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def _get_name(self):
return u'docker.' + (self.container or self.image)
return u'docker.' + (self.options.container or self.options.image)
def get_boot_command(self):
args = ['--interactive']
if self.username:
args += ['--user=' + self.username]
if self.options.username:
args += ['--user=' + self.options.username]
bits = [self.docker_path]
if self.container:
bits += ['exec'] + args + [self.container]
elif self.image:
bits += ['run'] + args + ['--rm', self.image]
bits = [self.options.docker_path]
if self.options.container:
bits += ['exec'] + args + [self.options.container]
elif self.options.image:
bits += ['run'] + args + ['--rm', self.options.image]
return bits + super(Stream, self).get_boot_command()
return bits + super(Connection, self).get_boot_command()

@ -117,14 +117,12 @@ SSH_GETOPTS = (
_mitogen = None
class IoPump(mitogen.core.BasicStream):
class IoPump(mitogen.core.Protocol):
_output_buf = ''
_closed = False
def __init__(self, broker, stdin_fd, stdout_fd):
def __init__(self, broker):
self._broker = broker
self.receive_side = mitogen.core.Side(self, stdout_fd)
self.transmit_side = mitogen.core.Side(self, stdin_fd)
def write(self, s):
self._output_buf += s
@ -134,13 +132,13 @@ class IoPump(mitogen.core.BasicStream):
self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fd is not None:
if self.transmit_side.fp.fileno() is not None:
self._broker._start_transmit(self)
def on_shutdown(self, broker):
def on_shutdown(self, stream, broker):
self.close()
def on_transmit(self, broker):
def on_transmit(self, stream, broker):
written = self.transmit_side.write(self._output_buf)
IOLOG.debug('%r.on_transmit() -> len %r', self, written)
if written is None:
@ -153,8 +151,8 @@ class IoPump(mitogen.core.BasicStream):
if self._closed:
self.on_disconnect(broker)
def on_receive(self, broker):
s = self.receive_side.read()
def on_receive(self, stream, broker):
s = stream.receive_side.read()
IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if s:
mitogen.core.fire(self, 'receive', s)
@ -163,8 +161,8 @@ class IoPump(mitogen.core.BasicStream):
def __repr__(self):
return 'IoPump(%r, %r)' % (
self.receive_side.fd,
self.transmit_side.fd,
self.receive_side.fp.fileno(),
self.transmit_side.fp.fileno(),
)
@ -173,14 +171,15 @@ class Process(object):
Manages the lifetime and pipe connections of the SSH command running in the
slave.
"""
def __init__(self, router, stdin_fd, stdout_fd, proc=None):
def __init__(self, router, stdin_fp, stdout_fp, proc=None):
self.router = router
self.stdin_fd = stdin_fd
self.stdout_fd = stdout_fd
self.stdin_fp = stdin_fp
self.stdout_fp = stdout_fp
self.proc = proc
self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump(router.broker, stdin_fd, stdout_fd)
self.pump = IoPump.build_stream(router.broker)
self.pump.accept(stdin_fp, stdout_fp)
self.stdin = None
self.control = None
self.wake_event = threading.Event()
@ -193,7 +192,7 @@ class Process(object):
pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self):
return 'Process(%r, %r)' % (self.stdin_fd, self.stdout_fd)
return 'Process(%r, %r)' % (self.stdin_fp, self.stdout_fp)
def _on_proc_exit(self, status):
LOG.debug('%r._on_proc_exit(%r)', self, status)
@ -202,12 +201,12 @@ class Process(object):
def _on_stdin(self, msg):
if msg.is_dead:
IOLOG.debug('%r._on_stdin() -> %r', self, data)
self.pump.close()
self.pump.protocol.close()
return
data = msg.unpickle()
IOLOG.debug('%r._on_stdin() -> len %d', self, len(data))
self.pump.write(data)
self.pump.protocol.write(data)
def _on_control(self, msg):
if not msg.is_dead:
@ -279,13 +278,7 @@ def _start_slave(src_id, cmdline, router):
stdout=subprocess.PIPE,
)
process = Process(
router,
proc.stdin.fileno(),
proc.stdout.fileno(),
proc,
)
process = Process(router, proc.stdin, proc.stdout, proc)
return process.control_handle, process.stdin_handle
@ -361,7 +354,9 @@ def _fakessh_main(dest_context_id, econtext):
LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r',
control_handle, stdin_handle)
process = Process(econtext.router, 1, 0)
process = Process(econtext.router,
stdin_fp=os.fdopen(1, 'w+b', 0),
stdout_fp=os.fdopen(0, 'r+b', 0))
process.start_master(
stdin=mitogen.core.Sender(dest, stdin_handle),
control=mitogen.core.Sender(dest, control_handle),
@ -427,7 +422,7 @@ def run(dest, router, args, deadline=None, econtext=None):
stream = mitogen.core.Stream(router, context_id)
stream.name = u'fakessh'
stream.accept(sock1.fileno(), sock1.fileno())
stream.accept(sock1, sock1)
router.register(fakessh, stream)
# Held in socket buffer until process is booted.

@ -28,6 +28,7 @@
# !mitogen: minify_safe
import errno
import logging
import os
import random
@ -119,32 +120,45 @@ def handle_child_crash():
os._exit(1)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = True
class Process(mitogen.parent.Process):
def poll(self):
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD:
LOG.warn('%r: waitpid(%r) produced ECHILD', self, self.pid)
return
raise
if not pid:
return
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Options(mitogen.parent.Options):
#: Reference to the importer, if any, recovered from the parent.
importer = None
#: User-supplied function for cleaning up child process state.
on_fork = None
python_version_msg = (
"The mitogen.fork method is not supported on Python versions "
"prior to 2.6, since those versions made no attempt to repair "
"critical interpreter state following a fork. Please use the "
"local() method instead."
)
def construct(self, old_router, max_message_size, on_fork=None,
debug=False, profiling=False, unidirectional=False,
on_start=None):
def __init__(self, old_router, max_message_size, on_fork=None, debug=False,
profiling=False, unidirectional=False, on_start=None,
name=None):
if not FORK_SUPPORTED:
raise Error(self.python_version_msg)
# fork method only supports a tiny subset of options.
super(Stream, self).construct(max_message_size=max_message_size,
debug=debug, profiling=profiling,
unidirectional=False)
super(Options, self).__init__(
max_message_size=max_message_size, debug=debug,
profiling=profiling, unidirectional=unidirectional, name=name,
)
self.on_fork = on_fork
self.on_start = on_start
@ -152,17 +166,26 @@ class Stream(mitogen.parent.Stream):
if isinstance(responder, mitogen.parent.ModuleForwarder):
self.importer = responder.importer
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = True
python_version_msg = (
"The mitogen.fork method is not supported on Python versions "
"prior to 2.6, since those versions made no attempt to repair "
"critical interpreter state following a fork. Please use the "
"local() method instead."
)
name_prefix = u'fork'
def start_child(self):
parentfp, childfp = mitogen.parent.create_socketpair()
self.pid = os.fork()
if self.pid:
pid = os.fork()
if pid:
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
return self.pid, fd, None
return Process(pid, parentfp)
else:
parentfp.close()
self._wrap_child_main(childfp)
@ -173,12 +196,24 @@ class Stream(mitogen.parent.Stream):
except BaseException:
handle_child_crash()
def get_econtext_config(self):
config = super(Connection, self).get_econtext_config()
config['core_src_fd'] = None
config['importer'] = self.options.importer
config['send_ec2'] = False
config['setup_package'] = False
if self.options.on_start:
config['on_start'] = self.options.on_start
return config
def _child_main(self, childfp):
on_fork()
if self.on_fork:
self.on_fork()
if self.options.on_fork:
self.options.on_fork()
mitogen.core.set_block(childfp.fileno())
childfp.send('MITO002\n')
# Expected by the ExternalContext.main().
os.dup2(childfp.fileno(), 1)
os.dup2(childfp.fileno(), 100)
@ -201,23 +236,12 @@ class Stream(mitogen.parent.Stream):
if childfp.fileno() not in (0, 1, 100):
childfp.close()
config = self.get_econtext_config()
config['core_src_fd'] = None
config['importer'] = self.importer
config['setup_package'] = False
if self.on_start:
config['on_start'] = self.on_start
try:
try:
mitogen.core.ExternalContext(config).main()
mitogen.core.ExternalContext(self.get_econtext_config()).main()
except Exception:
# TODO: report exception somehow.
os._exit(72)
finally:
# Don't trigger atexit handlers, they were copied from the parent.
os._exit(0)
def _connect_bootstrap(self):
# None required.
pass

@ -37,29 +37,34 @@ import mitogen.parent
LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
create_child_args = {
'merge_stdio': True
}
class Options(mitogen.parent.Options):
container = None
username = None
jexec_path = '/usr/sbin/jexec'
jexec_path = u'/usr/sbin/jexec'
def construct(self, container, jexec_path=None, username=None, **kwargs):
super(Stream, self).construct(**kwargs)
self.container = container
self.username = username
def __init__(self, container, jexec_path=None, username=None, **kwargs):
super(Options, self).__init__(**kwargs)
self.container = mitogen.core.to_text(container)
if username:
self.username = mitogen.core.to_text(username)
if jexec_path:
self.jexec_path = jexec_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
create_child_args = {
'merge_stdio': True
}
def _get_name(self):
return u'jail.' + self.container
return u'jail.' + self.options.container
def get_boot_command(self):
bits = [self.jexec_path]
if self.username:
bits += ['-U', self.username]
bits += [self.container]
return bits + super(Stream, self).get_boot_command()
bits = [self.options.jexec_path]
if self.options.username:
bits += ['-U', self.options.username]
bits += [self.options.container]
return bits + super(Connection, self).get_boot_command()

@ -37,29 +37,36 @@ import mitogen.parent
LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = True
class Options(mitogen.parent.Options):
pod = None
kubectl_path = 'kubectl'
kubectl_args = None
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def construct(self, pod, kubectl_path=None, kubectl_args=None, **kwargs):
super(Stream, self).construct(**kwargs)
def __init__(self, pod, kubectl_path=None, kubectl_args=None, **kwargs):
super(Options, self).__init__(**kwargs)
assert pod
self.pod = pod
if kubectl_path:
self.kubectl_path = kubectl_path
self.kubectl_args = kubectl_args or []
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = True
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def _get_name(self):
return u'kubectl.%s%s' % (self.pod, self.kubectl_args)
return u'kubectl.%s%s' % (self.options.pod, self.options.kubectl_args)
def get_boot_command(self):
bits = [self.kubectl_path] + self.kubectl_args + ['exec', '-it', self.pod]
return bits + ["--"] + super(Stream, self).get_boot_command()
bits = [
self.options.kubectl_path
] + self.options.kubectl_args + [
'exec', '-it', self.options.pod
]
return bits + ["--"] + super(Connection, self).get_boot_command()

@ -37,7 +37,20 @@ import mitogen.parent
LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
class Options(mitogen.parent.Options):
container = None
lxc_attach_path = 'lxc-attach'
def __init__(self, container, lxc_attach_path=None, **kwargs):
super(Options, self).__init__(**kwargs)
self.container = container
if lxc_attach_path:
self.lxc_attach_path = lxc_attach_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
create_child_args = {
# If lxc-attach finds any of stdin, stdout, stderr connected to a TTY,
@ -47,29 +60,20 @@ class Stream(mitogen.parent.Stream):
'merge_stdio': True
}
container = None
lxc_attach_path = 'lxc-attach'
eof_error_hint = (
'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more '
'information.'
)
def construct(self, container, lxc_attach_path=None, **kwargs):
super(Stream, self).construct(**kwargs)
self.container = container
if lxc_attach_path:
self.lxc_attach_path = lxc_attach_path
def _get_name(self):
return u'lxc.' + self.container
return u'lxc.' + self.options.container
def get_boot_command(self):
bits = [
self.lxc_attach_path,
self.options.lxc_attach_path,
'--clear-env',
'--name', self.container,
'--name', self.options.container,
'--',
]
return bits + super(Stream, self).get_boot_command()
return bits + super(Connection, self).get_boot_command()

@ -37,7 +37,21 @@ import mitogen.parent
LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream):
class Options(mitogen.parent.Options):
container = None
lxc_path = 'lxc'
python_path = 'python'
def __init__(self, container, lxc_path=None, **kwargs):
super(Options, self).__init__(**kwargs)
self.container = container
if lxc_path:
self.lxc_path = lxc_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
create_child_args = {
# If lxc finds any of stdin, stdout, stderr connected to a TTY, to
@ -47,31 +61,21 @@ class Stream(mitogen.parent.Stream):
'merge_stdio': True
}
container = None
lxc_path = 'lxc'
python_path = 'python'
eof_error_hint = (
'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more '
'information.'
)
def construct(self, container, lxc_path=None, **kwargs):
super(Stream, self).construct(**kwargs)
self.container = container
if lxc_path:
self.lxc_path = lxc_path
def _get_name(self):
return u'lxd.' + self.container
return u'lxd.' + self.options.container
def get_boot_command(self):
bits = [
self.lxc_path,
self.options.lxc_path,
'exec',
'--mode=noninteractive',
self.container,
self.options.container,
'--',
]
return bits + super(Stream, self).get_boot_command()
return bits + super(Connection, self).get_boot_command()

@ -531,14 +531,15 @@ class SysModulesMethod(FinderMethod):
return
if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module',
fullname)
LOG.debug('%r: sys.modules[%r] absent or not a regular module',
self, fullname)
return
path = _py_filename(getattr(module, '__file__', ''))
if not path:
return
LOG.debug('%r: sys.modules[%r]: found %s', self, fullname, path)
is_pkg = hasattr(module, '__path__')
try:
source = inspect.getsource(module)
@ -920,17 +921,17 @@ class ModuleResponder(object):
return tup
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
if fullname not in stream.protocol.sent_modules:
tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
LOG.debug('%s: sending %s (%.2f KiB) to %s',
self, fullname, len(msg.data) / 1024.0, stream.name)
self._router._async_route(msg)
stream.sent_modules.add(fullname)
stream.protocol.sent_modules.add(fullname)
if tup[2] is not None:
self.good_load_module_count += 1
self.good_load_module_size += len(msg.data)
@ -939,23 +940,23 @@ class ModuleResponder(object):
def _send_module_load_failed(self, stream, fullname):
self.bad_load_module_count += 1
stream.send(
stream.protocol.send(
mitogen.core.Message.pickled(
self._make_negative_response(fullname),
dst_id=stream.remote_id,
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _send_module_and_related(self, stream, fullname):
if fullname in stream.sent_modules:
if fullname in stream.protocol.sent_modules:
return
try:
tup = self._build_tuple(fullname)
for name in tup[4]: # related
parent, _, _ = str_partition(name, '.')
if parent != fullname and parent not in stream.sent_modules:
if parent != fullname and parent not in stream.protocol.sent_modules:
# Parent hasn't been sent, so don't load submodule yet.
continue
@ -976,7 +977,7 @@ class ModuleResponder(object):
fullname = msg.data.decode()
LOG.debug('%s requested module %s', stream.name, fullname)
self.get_module_count += 1
if fullname in stream.sent_modules:
if fullname in stream.protocol.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream)
@ -987,12 +988,12 @@ class ModuleResponder(object):
self.get_module_secs += time.time() - t0
def _send_forward_module(self, stream, context, fullname):
if stream.remote_id != context.context_id:
if stream.protocol.remote_id != context.context_id:
stream.send(
mitogen.core.Message(
data=b('%s\x00%s' % (context.context_id, fullname)),
handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id,
dst_id=stream.protocol.remote_id,
)
)

File diff suppressed because it is too large Load Diff

@ -485,7 +485,6 @@ class Pool(object):
)
thread.start()
self._threads.append(thread)
LOG.debug('%r: initialized', self)
def _py_24_25_compat(self):
@ -658,7 +657,7 @@ class PushFileService(Service):
def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id)
child = mitogen.core.Context(self.router, stream.protocol.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent:
if child.context_id != context.context_id:
@ -891,7 +890,7 @@ class FileService(Service):
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)

@ -116,9 +116,15 @@ def get_machinectl_pid(path, name):
raise Error("could not find PID from machinectl output.\n%s", output)
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
GET_LEADER_BY_KIND = {
'docker': ('docker_path', get_docker_pid),
'lxc': ('lxc_info_path', get_lxc_pid),
'lxd': ('lxc_path', get_lxd_pid),
'machinectl': ('machinectl_path', get_machinectl_pid),
}
class Options(mitogen.parent.Options):
container = None
username = 'root'
kind = None
@ -128,24 +134,17 @@ class Stream(mitogen.parent.Stream):
lxc_info_path = 'lxc-info'
machinectl_path = 'machinectl'
GET_LEADER_BY_KIND = {
'docker': ('docker_path', get_docker_pid),
'lxc': ('lxc_info_path', get_lxc_pid),
'lxd': ('lxc_path', get_lxd_pid),
'machinectl': ('machinectl_path', get_machinectl_pid),
}
def construct(self, container, kind, username=None, docker_path=None,
def __init__(self, container, kind, username=None, docker_path=None,
lxc_path=None, lxc_info_path=None, machinectl_path=None,
**kwargs):
super(Stream, self).construct(**kwargs)
if kind not in self.GET_LEADER_BY_KIND:
super(Options, self).__init__(**kwargs)
if kind not in GET_LEADER_BY_KIND:
raise Error('unsupported container kind: %r', kind)
self.container = container
self.container = mitogen.core.to_text(container)
self.kind = kind
if username:
self.username = username
self.username = mitogen.core.to_text(username)
if docker_path:
self.docker_path = docker_path
if lxc_path:
@ -155,6 +154,11 @@ class Stream(mitogen.parent.Stream):
if machinectl_path:
self.machinectl_path = machinectl_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
# Order matters. https://github.com/karelzak/util-linux/commit/854d0fe/
NS_ORDER = ('ipc', 'uts', 'net', 'pid', 'mnt', 'user')
@ -189,15 +193,15 @@ class Stream(mitogen.parent.Stream):
try:
os.setgroups([grent.gr_gid
for grent in grp.getgrall()
if self.username in grent.gr_mem])
pwent = pwd.getpwnam(self.username)
if self.options.username in grent.gr_mem])
pwent = pwd.getpwnam(self.options.username)
os.setreuid(pwent.pw_uid, pwent.pw_uid)
# shadow-4.4/libmisc/setupenv.c. Not done: MAIL, PATH
os.environ.update({
'HOME': pwent.pw_dir,
'SHELL': pwent.pw_shell or '/bin/sh',
'LOGNAME': self.username,
'USER': self.username,
'LOGNAME': self.options.username,
'USER': self.options.username,
})
if ((os.path.exists(pwent.pw_dir) and
os.access(pwent.pw_dir, os.X_OK))):
@ -217,7 +221,7 @@ class Stream(mitogen.parent.Stream):
# namespaces, meaning starting new threads in the exec'd program will
# fail. The solution is forking, so inject a /bin/sh call to achieve
# this.
argv = super(Stream, self).get_boot_command()
argv = super(Connection, self).get_boot_command()
# bash will exec() if a single command was specified and the shell has
# nothing left to do, so "; exit $?" gives bash a reason to live.
return ['/bin/sh', '-c', '%s; exit $?' % (mitogen.parent.Argv(argv),)]
@ -226,13 +230,12 @@ class Stream(mitogen.parent.Stream):
return mitogen.parent.create_child(args, preexec_fn=self.preexec_fn)
def _get_name(self):
return u'setns.' + self.container
return u'setns.' + self.options.container
def connect(self):
self.name = self._get_name()
attr, func = self.GET_LEADER_BY_KIND[self.kind]
tool_path = getattr(self, attr)
self.leader_pid = func(tool_path, self.container)
def connect(self, **kwargs):
attr, func = GET_LEADER_BY_KIND[self.options.kind]
tool_path = getattr(self.options, attr)
self.leader_pid = func(tool_path, self.options.container)
LOG.debug('Leader PID for %s container %r: %d',
self.kind, self.container, self.leader_pid)
super(Stream, self).connect()
self.options.kind, self.options.container, self.leader_pid)
return super(Connection, self).connect(**kwargs)

@ -29,7 +29,7 @@
# !mitogen: minify_safe
"""
Functionality to allow establishing new slave contexts over an SSH connection.
Construct new children via the OpenSSH client.
"""
import logging
@ -52,82 +52,122 @@ except NameError:
LOG = logging.getLogger('mitogen')
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified'
hostkey_config_msg = (
'SSH requested permission to accept unknown host key, but '
'check_host_keys=ignore. This is likely due to ssh_args= '
'conflicting with check_host_keys=. Please correct your '
'configuration.'
)
hostkey_failed_msg = (
'Host key checking is enabled, and SSH reported an unrecognized or '
'mismatching host key.'
)
# sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = b('password')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.')
PASSWORD_PROMPT_PATTERN = re.compile(
b('password'),
re.I
)
HOSTKEY_REQ_PATTERN = re.compile(
b(r'are you sure you want to continue connecting \(yes/no\)\?'),
re.I
)
HOSTKEY_FAIL_PATTERN = re.compile(
b(r'host key verification failed\.'),
re.I
)
# [user@host: ] permission denied
PERMDENIED_RE = re.compile(
('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied').encode(),
PERMDENIED_PATTERN = re.compile(
b('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied'),
re.I
)
DEBUG_PATTERN = re.compile(b'^debug[123]:')
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
class PasswordError(mitogen.core.StreamError):
pass
def filter_debug(stream, it):
"""
Read line chunks from it, either yielding them directly, or building up and
logging individual lines if they look like SSH debug output.
This contains the mess of dealing with both line-oriented input, and partial
lines such as the password prompt.
class HostKeyError(mitogen.core.StreamError):
pass
Yields `(line, partial)` tuples, where `line` is the line, `partial` is
:data:`True` if no terminating newline character was present and no more
data exists in the read buffer. Consuming code can use this to unreliably
detect the presence of an interactive prompt.
class SetupProtocol(mitogen.parent.RegexProtocol):
"""
This protocol is attached to stderr of the SSH client. It responds to
various interactive prompts as required.
"""
# The `partial` test is unreliable, but is only problematic when verbosity
# is enabled: it's possible for a combination of SSH banner, password
# prompt, verbose output, timing and OS buffering specifics to create a
# situation where an otherwise newline-terminated line appears to not be
# terminated, due to a partial read(). If something is broken when
# ssh_debug_level>0, this is the first place to look.
state = 'start_of_line'
buf = b('')
for chunk in it:
buf += chunk
while buf:
if state == 'start_of_line':
if len(buf) < 8:
# short read near buffer limit, block awaiting at least 8
# bytes so we can discern a debug line, or the minimum
# interesting token from above or the bootstrap
# ('password', 'MITO000\n').
break
elif any(buf.startswith(p) for p in DEBUG_PREFIXES):
state = 'in_debug'
password_sent = False
def _on_host_key_request(self, line, match):
if self.stream.conn.options.check_host_keys == 'accept':
LOG.debug('%s: accepting host key', self.stream.name)
self.stream.transmit_side.write(b('yes\n'))
return
# _host_key_prompt() should never be reached with ignore or enforce
# mode, SSH should have handled that. User's ssh_args= is conflicting
# with ours.
self.stream.conn._fail_connection(HostKeyError(hostkey_config_msg))
def _on_host_key_failed(self, line, match):
self.stream.conn._fail_connection(HostKeyError(hostkey_failed_msg))
def _on_permission_denied(self, line, match):
# issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching it at
# the start of the line.
if self.stream.conn.options.password is not None and \
self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
elif PASSWORD_PROMPT_PATTERN.search(line) and \
self.stream.conn.options.password is None:
# Permission denied (password,pubkey)
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
else:
state = 'in_plain'
elif state == 'in_debug':
if b('\n') not in buf:
break
line, _, buf = bytes_partition(buf, b('\n'))
LOG.debug('%s: %s', stream.name,
mitogen.core.to_text(line.rstrip()))
state = 'start_of_line'
elif state == 'in_plain':
line, nl, buf = bytes_partition(buf, b('\n'))
yield line + nl, not (nl or buf)
if nl:
state = 'start_of_line'
self.stream.conn._fail_connection(
PasswordError(auth_incorrect_msg)
)
def _on_password_prompt(self, line, match):
LOG.debug('%s: (password prompt): %s', self.stream.name, line)
if self.stream.conn.options.password is None:
self.stream.conn._fail(PasswordError(password_required_msg))
class PasswordError(mitogen.core.StreamError):
pass
self.stream.transmit_side.write(
(self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
def _on_debug_line(self, line, match):
text = mitogen.core.to_text(line.rstrip())
LOG.debug('%s: %s', self.stream.name, text)
class HostKeyError(mitogen.core.StreamError):
pass
PATTERNS = [
(DEBUG_PATTERN, _on_debug_line),
(HOSTKEY_FAIL_PATTERN, _on_host_key_failed),
(PERMDENIED_PATTERN, _on_permission_denied),
]
PARTIAL_PATTERNS = [
(PASSWORD_PROMPT_PATTERN, _on_password_prompt),
(HOSTKEY_REQ_PATTERN, _on_host_key_request),
]
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
class Options(mitogen.parent.Options):
#: Default to whatever is available as 'python' on the remote machine,
#: overriding sys.executable use.
python_path = 'python'
@ -141,19 +181,19 @@ class Stream(mitogen.parent.Stream):
hostname = None
username = None
port = None
identity_file = None
password = None
ssh_args = None
check_host_keys_msg = 'check_host_keys= must be set to accept, enforce or ignore'
def construct(self, hostname, username=None, ssh_path=None, port=None,
def __init__(self, hostname, username=None, ssh_path=None, port=None,
check_host_keys='enforce', password=None, identity_file=None,
compression=True, ssh_args=None, keepalive_enabled=True,
keepalive_count=3, keepalive_interval=15,
identities_only=True, ssh_debug_level=None, **kwargs):
super(Stream, self).construct(**kwargs)
super(Options, self).__init__(**kwargs)
if check_host_keys not in ('accept', 'enforce', 'ignore'):
raise ValueError(self.check_host_keys_msg)
@ -175,143 +215,81 @@ class Stream(mitogen.parent.Stream):
if ssh_debug_level:
self.ssh_debug_level = ssh_debug_level
self._init_create_child()
class Connection(mitogen.parent.Connection):
options_class = Options
diag_protocol_class = SetupProtocol
child_is_immediate_subprocess = False
def _get_name(self):
s = u'ssh.' + mitogen.core.to_text(self.options.hostname)
if self.options.port and self.options.port != 22:
s += u':%s' % (self.options.port,)
return s
def _requires_pty(self):
"""
Return :data:`True` if the configuration requires a PTY to be
allocated. This is only true if we must interactively accept host keys,
or type a password.
Return :data:`True` if a PTY to is required for this configuration,
because it must interactively accept host keys or type a password.
"""
return (self.check_host_keys == 'accept' or
self.password is not None)
return (
self.options.check_host_keys == 'accept' or
self.options.password is not None
)
def _init_create_child(self):
def create_child(self, **kwargs):
"""
Initialize the base class :attr:`create_child` and
:attr:`create_child_args` according to whether we need a PTY or not.
Avoid PTY use when possible to avoid a scaling limitation.
"""
if self._requires_pty():
self.create_child = mitogen.parent.hybrid_tty_create_child
return mitogen.parent.hybrid_tty_create_child(**kwargs)
else:
self.create_child = mitogen.parent.create_child
self.create_child_args = {
'stderr_pipe': True,
}
return mitogen.parent.create_child(stderr_pipe=True, **kwargs)
def get_boot_command(self):
bits = [self.ssh_path]
if self.ssh_debug_level:
bits += ['-' + ('v' * min(3, self.ssh_debug_level))]
bits = [self.options.ssh_path]
if self.options.ssh_debug_level:
bits += ['-' + ('v' * min(3, self.options.ssh_debug_level))]
else:
# issue #307: suppress any login banner, as it may contain the
# password prompt, and there is no robust way to tell the
# difference.
bits += ['-o', 'LogLevel ERROR']
if self.username:
bits += ['-l', self.username]
if self.port is not None:
bits += ['-p', str(self.port)]
if self.identities_only and (self.identity_file or self.password):
if self.options.username:
bits += ['-l', self.options.username]
if self.options.port is not None:
bits += ['-p', str(self.options.port)]
if self.options.identities_only and (self.options.identity_file or
self.options.password):
bits += ['-o', 'IdentitiesOnly yes']
if self.identity_file:
bits += ['-i', self.identity_file]
if self.compression:
if self.options.identity_file:
bits += ['-i', self.options.identity_file]
if self.options.compression:
bits += ['-o', 'Compression yes']
if self.keepalive_enabled:
if self.options.keepalive_enabled:
bits += [
'-o', 'ServerAliveInterval %s' % (self.keepalive_interval,),
'-o', 'ServerAliveCountMax %s' % (self.keepalive_count,),
'-o', 'ServerAliveInterval %s' % (
self.options.keepalive_interval,
),
'-o', 'ServerAliveCountMax %s' % (
self.options.keepalive_count,
),
]
if not self._requires_pty():
bits += ['-o', 'BatchMode yes']
if self.check_host_keys == 'enforce':
if self.options.check_host_keys == 'enforce':
bits += ['-o', 'StrictHostKeyChecking yes']
if self.check_host_keys == 'accept':
if self.options.check_host_keys == 'accept':
bits += ['-o', 'StrictHostKeyChecking ask']
elif self.check_host_keys == 'ignore':
elif self.options.check_host_keys == 'ignore':
bits += [
'-o', 'StrictHostKeyChecking no',
'-o', 'UserKnownHostsFile /dev/null',
'-o', 'GlobalKnownHostsFile /dev/null',
]
if self.ssh_args:
bits += self.ssh_args
bits.append(self.hostname)
base = super(Stream, self).get_boot_command()
if self.options.ssh_args:
bits += self.options.ssh_args
bits.append(self.options.hostname)
base = super(Connection, self).get_boot_command()
return bits + [shlex_quote(s).strip() for s in base]
def _get_name(self):
s = u'ssh.' + mitogen.core.to_text(self.hostname)
if self.port:
s += u':%s' % (self.port,)
return s
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified'
hostkey_config_msg = (
'SSH requested permission to accept unknown host key, but '
'check_host_keys=ignore. This is likely due to ssh_args= '
'conflicting with check_host_keys=. Please correct your '
'configuration.'
)
hostkey_failed_msg = (
'Host key checking is enabled, and SSH reported an unrecognized or '
'mismatching host key.'
)
def _host_key_prompt(self):
if self.check_host_keys == 'accept':
LOG.debug('%s: accepting host key', self.name)
self.diag_stream.transmit_side.write(b('yes\n'))
return
# _host_key_prompt() should never be reached with ignore or enforce
# mode, SSH should have handled that. User's ssh_args= is conflicting
# with ours.
raise HostKeyError(self.hostkey_config_msg)
def _connect_input_loop(self, it):
password_sent = False
for buf, partial in filter_debug(self, it):
LOG.debug('%s: stdout: %s', self.name, buf.rstrip())
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
elif HOSTKEY_REQ_PROMPT in buf.lower():
self._host_key_prompt()
elif HOSTKEY_FAIL in buf.lower():
raise HostKeyError(self.hostkey_failed_msg)
elif PERMDENIED_RE.match(buf):
# issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching
# it at the start of the line.
if self.password is not None and password_sent:
raise PasswordError(self.password_incorrect_msg)
elif PASSWORD_PROMPT in buf and self.password is None:
# Permission denied (password,pubkey)
raise PasswordError(self.password_required_msg)
else:
raise PasswordError(self.auth_incorrect_msg)
elif partial and PASSWORD_PROMPT in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%s: sending password', self.name)
self.diag_stream.transmit_side.write(
(self.password + '\n').encode()
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)
try:
self._connect_input_loop(it)
finally:
it.close()

@ -29,6 +29,7 @@
# !mitogen: minify_safe
import logging
import re
import mitogen.core
import mitogen.parent
@ -42,87 +43,120 @@ except NameError:
LOG = logging.getLogger(__name__)
password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required'
class PasswordError(mitogen.core.StreamError):
pass
class Stream(mitogen.parent.Stream):
# TODO: BSD su cannot handle stdin being a socketpair, but it does let the
# child inherit fds from the parent. So we can still pass a socketpair in
# for hybrid_tty_create_child(), there just needs to be either a shell
# snippet or bootstrap support for fixing things up afterwards.
create_child = staticmethod(mitogen.parent.tty_create_child)
child_is_immediate_subprocess = False
class SetupBootstrapProtocol(mitogen.parent.BootstrapProtocol):
password_sent = False
def setup_patterns(self, conn):
"""
su options cause the regexes used to vary. This is a mess, requires
reworking.
"""
incorrect_pattern = re.compile(
mitogen.core.b('|').join(
re.escape(s.encode('utf-8'))
for s in conn.options.incorrect_prompts
),
re.I
)
prompt_pattern = re.compile(
re.escape(
conn.options.password_prompt.encode('utf-8')
),
re.I
)
self.PATTERNS = mitogen.parent.BootstrapProtocol.PATTERNS + [
(incorrect_pattern, type(self)._on_password_incorrect),
]
self.PARTIAL_PATTERNS = mitogen.parent.BootstrapProtocol.PARTIAL_PATTERNS + [
(prompt_pattern, type(self)._on_password_prompt),
]
#: Once connected, points to the corresponding DiagLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
def _on_password_prompt(self, line, match):
LOG.debug('%r: (password prompt): %r',
self.stream.name, line.decode('utf-8', 'replace'))
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
return
self.stream.transmit_side.write(
(self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
username = 'root'
def _on_password_incorrect(self, line, match):
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
class Options(mitogen.parent.Options):
username = u'root'
password = None
su_path = 'su'
password_prompt = b('password:')
password_prompt = u'password:'
incorrect_prompts = (
b('su: sorry'), # BSD
b('su: authentication failure'), # Linux
b('su: incorrect password'), # CentOS 6
b('authentication is denied'), # AIX
u'su: sorry', # BSD
u'su: authentication failure', # Linux
u'su: incorrect password', # CentOS 6
u'authentication is denied', # AIX
)
def construct(self, username=None, password=None, su_path=None,
def __init__(self, username=None, password=None, su_path=None,
password_prompt=None, incorrect_prompts=None, **kwargs):
super(Stream, self).construct(**kwargs)
super(Options, self).__init__(**kwargs)
if username is not None:
self.username = username
self.username = mitogen.core.to_text(username)
if password is not None:
self.password = password
self.password = mitogen.core.to_text(password)
if su_path is not None:
self.su_path = su_path
if password_prompt is not None:
self.password_prompt = password_prompt.lower()
self.password_prompt = password_prompt
if incorrect_prompts is not None:
self.incorrect_prompts = map(str.lower, incorrect_prompts)
self.incorrect_prompts = [
mitogen.core.to_text(p)
for p in incorrect_prompts
]
def _get_name(self):
return u'su.' + mitogen.core.to_text(self.username)
def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)]
class Connection(mitogen.parent.Connection):
options_class = Options
stream_protocol_class = SetupBootstrapProtocol
password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required'
def _connect_input_loop(self, it):
password_sent = False
# TODO: BSD su cannot handle stdin being a socketpair, but it does let the
# child inherit fds from the parent. So we can still pass a socketpair in
# for hybrid_tty_create_child(), there just needs to be either a shell
# snippet or bootstrap support for fixing things up afterwards.
create_child = staticmethod(mitogen.parent.tty_create_child)
child_is_immediate_subprocess = False
for buf in it:
LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
if any(s in buf.lower() for s in self.incorrect_prompts):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
elif self.password_prompt in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
def _get_name(self):
return u'su.' + self.options.username
raise mitogen.core.StreamError('bootstrap failed')
def stream_factory(self):
stream = super(Connection, self).stream_factory()
stream.protocol.setup_patterns(self)
return stream
def _connect_bootstrap(self):
it = mitogen.parent.iter_read(
fds=[self.receive_side.fd],
deadline=self.connect_deadline,
)
try:
self._connect_input_loop(it)
finally:
it.close()
def get_boot_command(self):
argv = mitogen.parent.Argv(super(Connection, self).get_boot_command())
return [self.options.su_path, self.options.username, '-c', str(argv)]

@ -40,6 +40,9 @@ from mitogen.core import b
LOG = logging.getLogger(__name__)
password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required'
# These are base64-encoded UTF-8 as our existing minifier/module server
# struggles with Unicode Python source in some (forgotten) circumstances.
PASSWORD_PROMPTS = [
@ -99,14 +102,13 @@ PASSWORD_PROMPTS = [
PASSWORD_PROMPT_RE = re.compile(
u'|'.join(
base64.b64decode(s).decode('utf-8')
mitogen.core.b('|').join(
base64.b64decode(s)
for s in PASSWORD_PROMPTS
),
re.I
)
)
PASSWORD_PROMPT = b('password')
SUDO_OPTIONS = [
#(False, 'bool', '--askpass', '-A')
#(False, 'str', '--auth-type', '-a')
@ -181,10 +183,7 @@ def option(default, *args):
return default
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
class Options(mitogen.parent.Options):
sudo_path = 'sudo'
username = 'root'
password = None
@ -195,15 +194,16 @@ class Stream(mitogen.parent.Stream):
selinux_role = None
selinux_type = None
def construct(self, username=None, sudo_path=None, password=None,
def __init__(self, username=None, sudo_path=None, password=None,
preserve_env=None, set_home=None, sudo_args=None,
login=None, selinux_role=None, selinux_type=None, **kwargs):
super(Stream, self).construct(**kwargs)
super(Options, self).__init__(**kwargs)
opts = parse_sudo_flags(sudo_args or [])
self.username = option(self.username, username, opts.user)
self.sudo_path = option(self.sudo_path, sudo_path)
self.password = password or None
if password:
self.password = mitogen.core.to_text(password)
self.preserve_env = option(self.preserve_env,
preserve_env, opts.preserve_env)
self.set_home = option(self.set_home, set_home, opts.set_home)
@ -211,67 +211,61 @@ class Stream(mitogen.parent.Stream):
self.selinux_role = option(self.selinux_role, selinux_role, opts.role)
self.selinux_type = option(self.selinux_type, selinux_type, opts.type)
class SetupProtocol(mitogen.parent.RegexProtocol):
password_sent = False
def _on_password_prompt(self, line, match):
LOG.debug('%s: (password prompt): %s',
self.stream.name, line.decode('utf-8', 'replace'))
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
return
self.stream.transmit_side.write(
(self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
PARTIAL_PATTERNS = [
(PASSWORD_PROMPT_RE, _on_password_prompt),
]
class Connection(mitogen.parent.Connection):
diag_protocol_class = SetupProtocol
options_class = Options
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
def _get_name(self):
return u'sudo.' + mitogen.core.to_text(self.username)
return u'sudo.' + mitogen.core.to_text(self.options.username)
def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, supply short-form
# to the sudo command.
bits = [self.sudo_path, '-u', self.username]
if self.preserve_env:
bits = [self.options.sudo_path, '-u', self.options.username]
if self.options.preserve_env:
bits += ['-E']
if self.set_home:
if self.options.set_home:
bits += ['-H']
if self.login:
if self.options.login:
bits += ['-i']
if self.selinux_role:
bits += ['-r', self.selinux_role]
if self.selinux_type:
bits += ['-t', self.selinux_type]
if self.options.selinux_role:
bits += ['-r', self.options.selinux_role]
if self.options.selinux_type:
bits += ['-t', self.options.selinux_type]
bits = bits + ['--'] + super(Stream, self).get_boot_command()
bits = bits + ['--'] + super(Connection, self).get_boot_command()
LOG.debug('sudo command line: %r', bits)
return bits
password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required'
def _connect_input_loop(self, it):
password_sent = False
for buf in it:
LOG.debug('%s: received %r', self.name, buf)
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
match = PASSWORD_PROMPT_RE.search(buf.decode('utf-8').lower())
if match is not None:
LOG.debug('%s: matched password prompt %r',
self.name, match.group(0))
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
self.diag_stream.transmit_side.write(
(mitogen.core.to_text(self.password) + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
it = mitogen.parent.iter_read(
fds=fds,
deadline=self.connect_deadline,
)
try:
self._connect_input_loop(it)
finally:
it.close()

@ -65,9 +65,38 @@ def make_socket_path():
return tempfile.mktemp(prefix='mitogen_unix_', suffix='.sock')
class Listener(mitogen.core.BasicStream):
class ListenerStream(mitogen.core.Stream):
def on_receive(self, broker):
sock, _ = self.receive_side.fp.accept()
try:
self.protocol.on_accept_client(sock)
except:
sock.close()
raise
class Listener(mitogen.core.Protocol):
stream_class = ListenerStream
keep_alive = True
@classmethod
def build_stream(cls, router, path=None, backlog=100):
if not path:
path = make_socket_path()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(path) and is_path_dead(path):
LOG.debug('%r: deleting stale %r', self, path)
os.unlink(path)
sock.bind(path)
os.chmod(path, int('0600', 8))
sock.listen(backlog)
stream = super(Listener, cls).build_stream(router, path)
stream.accept(sock, sock)
router.broker.start_receive(stream)
return stream
def __repr__(self):
return '%s.%s(%r)' % (
__name__,
@ -75,20 +104,9 @@ class Listener(mitogen.core.BasicStream):
self.path,
)
def __init__(self, router, path=None, backlog=100):
def __init__(self, router, path):
self._router = router
self.path = path or make_socket_path()
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(self.path) and is_path_dead(self.path):
LOG.debug('%r: deleting stale %r', self, self.path)
os.unlink(self.path)
self._sock.bind(self.path)
os.chmod(self.path, int('0600', 8))
self._sock.listen(backlog)
self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self)
self.path = path
def _unlink_socket(self):
try:
@ -102,10 +120,9 @@ class Listener(mitogen.core.BasicStream):
def on_shutdown(self, broker):
broker.stop_receive(self)
self._unlink_socket()
self._sock.close()
self.receive_side.closed = True
self.receive_side.close()
def _accept_client(self, sock):
def on_accept_client(self, sock):
sock.setblocking(True)
try:
pid, = struct.unpack('>L', sock.recv(4))
@ -115,12 +132,6 @@ class Listener(mitogen.core.BasicStream):
return
context_id = self._router.id_allocator.allocate()
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
try:
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
@ -129,21 +140,22 @@ class Listener(mitogen.core.BasicStream):
self, pid, sys.exc_info()[1])
return
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.MitogenProtocol.build_stream(
router=self._router,
remote_id=context_id,
)
stream.name = u'unix_client.%d' % (pid,)
stream.protocol.auth_id = mitogen.context_id
stream.protocol.is_privileged = True
side = mitogen.core.Side(stream, sock)
stream.receive_side = side
stream.transmit_side = side
LOG.debug('%r: accepted %r', self, stream)
stream.accept(sock.fileno(), sock.fileno())
self._router.register(context, stream)
def on_receive(self, broker):
sock, _ = self._sock.accept()
try:
self._accept_client(sock)
finally:
sock.close()
def connect(path, broker=None):
LOG.debug('unix.connect(path=%r)', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def _connect(path, broker, sock):
sock.connect(path)
sock.send(struct.pack('>L', os.getpid()))
mitogen.context_id, remote_id, pid = struct.unpack('>LLL', sock.recv(12))
@ -154,15 +166,24 @@ def connect(path, broker=None):
mitogen.context_id, remote_id)
router = mitogen.master.Router(broker=broker)
stream = mitogen.core.Stream(router, remote_id)
stream.accept(sock.fileno(), sock.fileno())
stream = mitogen.core.MitogenProtocol.build_stream(router, remote_id)
side = mitogen.core.Side(stream, sock)
stream.transmit_side = side
stream.receive_side = side
stream.name = u'unix_listener.%d' % (pid,)
context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)
mitogen.core.listen(router.broker, 'shutdown',
lambda: router.disconnect_stream(stream))
sock.close()
context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)
return router, context
def connect(path, broker=None):
LOG.debug('unix.connect(path=%r)', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
return _connect(path, broker, sock)
except:
sock.close()
raise

@ -19,15 +19,17 @@ import mitogen.sudo
router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo')
options = mitogen.ssh.Options(max_message_size=0, hostname='foo')
conn = mitogen.ssh.Connection(options, router)
conn.context_id = 123
print('SSH command size: %s' % (len(' '.join(stream.get_boot_command())),))
print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % (
len(stream.get_preamble()),
len(stream.get_preamble()) / 1024.0,
len(conn.get_preamble()),
len(conn.get_preamble()) / 1024.0,
))
if '--dump' in sys.argv:
print(zlib.decompress(stream.get_preamble()))
print(zlib.decompress(conn.get_preamble()))
exit()

@ -1,13 +0,0 @@
#!/usr/bin/env python
# I produce text every 100ms, for testing mitogen.core.iter_read()
import sys
import time
i = 0
while True:
i += 1
sys.stdout.write(str(i))
sys.stdout.flush()
time.sleep(0.1)

@ -1,9 +0,0 @@
#!/usr/bin/env python
# I consume 65535 bytes every 10ms, for testing mitogen.core.write_all()
import os
import time
while True:
os.read(0, 65535)
time.sleep(0.01)

@ -21,7 +21,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(argv[1], 'exec')
self.assertEquals(argv[2], '--interactive')
self.assertEquals(argv[3], 'container_name')
self.assertEquals(argv[4], stream.python_path)
self.assertEquals(argv[4], stream.conn.options.python_path)
if __name__ == '__main__':

@ -19,8 +19,10 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
# * 3.x starting 2.7
def test_valid_syntax(self):
stream = mitogen.parent.Stream(self.router, 0, max_message_size=123)
args = stream.get_boot_command()
options = mitogen.parent.Options(max_message_size=123)
conn = mitogen.parent.Connection(options, self.router)
conn.context = mitogen.core.Context(None, 123)
args = conn.get_boot_command()
# Executing the boot command will print "EC0" and expect to read from
# stdin, which will fail because it's pointing at /dev/null, causing
@ -38,7 +40,8 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
)
stdout, stderr = proc.communicate()
self.assertEquals(0, proc.returncode)
self.assertEquals(mitogen.parent.Stream.EC0_MARKER, stdout)
self.assertEquals(stdout,
mitogen.parent.BootstrapProtocol.EC0_MARKER+'\n')
self.assertIn(b("Error -5 while decompressing data"), stderr)
finally:
fp.close()

@ -38,7 +38,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
lxc_attach_path='true',
)
)
self.assertTrue(str(e).endswith(mitogen.lxc.Stream.eof_error_hint))
self.assertTrue(str(e).endswith(mitogen.lxc.Connection.eof_error_hint))
if __name__ == '__main__':

@ -30,7 +30,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
lxc_path='true',
)
)
self.assertTrue(str(e).endswith(mitogen.lxd.Stream.eof_error_hint))
self.assertTrue(str(e).endswith(mitogen.lxd.Connection.eof_error_hint))
if __name__ == '__main__':

@ -0,0 +1,34 @@
import unittest2
import mock
import mitogen.core
import testlib
class ReceiveOneTest(testlib.TestCase):
klass = mitogen.core.MitogenProtocol
def test_corruption(self):
broker = mock.Mock()
router = mock.Mock()
stream = mock.Mock()
protocol = self.klass(router, 1)
protocol.stream = stream
junk = mitogen.core.b('x') * mitogen.core.Message.HEADER_LEN
capture = testlib.LogCapturer()
capture.start()
protocol.on_receive(broker, junk)
capture.stop()
self.assertEquals(1, stream.on_disconnect.call_count)
expect = self.klass.corrupt_msg % (stream.name, junk)
self.assertTrue(expect in capture.raw())
if __name__ == '__main__':
unittest2.main()

@ -49,7 +49,7 @@ def wait_for_empty_output_queue(sync_recv, context):
while True:
# Now wait for the RPC to exit the output queue.
stream = router.stream_by_id(context.context_id)
if broker.defer_sync(lambda: stream.pending_bytes()) == 0:
if broker.defer_sync(lambda: stream.protocol.pending_bytes()) == 0:
return
time.sleep(0.1)
@ -69,35 +69,17 @@ class GetDefaultRemoteNameTest(testlib.TestCase):
self.assertEquals("ECORP_Administrator@box:123", self.func())
class WstatusToStrTest(testlib.TestCase):
func = staticmethod(mitogen.parent.wstatus_to_str)
class ReturncodeToStrTest(testlib.TestCase):
func = staticmethod(mitogen.parent.returncode_to_str)
def test_return_zero(self):
pid = os.fork()
if not pid:
os._exit(0)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(self.func(status),
'exited with return code 0')
self.assertEquals(self.func(0), 'exited with return code 0')
def test_return_one(self):
pid = os.fork()
if not pid:
os._exit(1)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(
self.func(status),
'exited with return code 1'
)
self.assertEquals(self.func(1), 'exited with return code 1')
def test_sigkill(self):
pid = os.fork()
if not pid:
time.sleep(600)
os.kill(pid, signal.SIGKILL)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(
self.func(status),
self.assertEquals(self.func(-signal.SIGKILL),
'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),)
)
@ -107,20 +89,20 @@ class WstatusToStrTest(testlib.TestCase):
class ReapChildTest(testlib.RouterMixin, testlib.TestCase):
def test_connect_timeout(self):
# Ensure the child process is reaped if the connection times out.
stream = mitogen.parent.Stream(
router=self.router,
remote_id=1234,
options = mitogen.parent.Options(
old_router=self.router,
max_message_size=self.router.max_message_size,
python_path=testlib.data_path('python_never_responds.py'),
connect_timeout=0.5,
)
conn = mitogen.parent.Connection(options, router=self.router)
self.assertRaises(mitogen.core.TimeoutError,
lambda: stream.connect()
lambda: conn.connect(context=mitogen.core.Context(None, 1234))
)
wait_for_child(stream.pid)
wait_for_child(conn.proc.pid)
e = self.assertRaises(OSError,
lambda: os.kill(stream.pid, 0)
lambda: os.kill(conn.proc.pid, 0)
)
self.assertEquals(e.args[0], errno.ESRCH)
@ -133,7 +115,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3,
)
)
prefix = "EOF on stream; last 300 bytes received: "
prefix = mitogen.parent.Connection.eof_error_msg
self.assertTrue(e.args[0].startswith(prefix))
def test_via_eof(self):
@ -142,12 +124,12 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local(
via=local,
python_path='true',
python_path='echo',
connect_timeout=3,
)
)
s = "EOF on stream; last 300 bytes received: "
self.assertTrue(s in e.args[0])
expect = mitogen.parent.Connection.eof_error_msg
self.assertTrue(expect in e.args[0])
def test_direct_enoent(self):
e = self.assertRaises(mitogen.core.StreamError,
@ -185,11 +167,15 @@ class OpenPtyTest(testlib.TestCase):
func = staticmethod(mitogen.parent.openpty)
def test_pty_returned(self):
master_fd, slave_fd = self.func()
self.assertTrue(isinstance(master_fd, int))
self.assertTrue(isinstance(slave_fd, int))
os.close(master_fd)
os.close(slave_fd)
master_fp, slave_fp = self.func()
try:
self.assertTrue(master_fp.isatty())
self.assertTrue(isinstance(master_fp, file))
self.assertTrue(slave_fp.isatty())
self.assertTrue(isinstance(slave_fp, file))
finally:
master_fp.close()
slave_fp.close()
@mock.patch('os.openpty')
def test_max_reached(self, openpty):
@ -204,20 +190,20 @@ class OpenPtyTest(testlib.TestCase):
@mock.patch('os.openpty')
def test_broken_linux_fallback(self, openpty):
openpty.side_effect = OSError(errno.EPERM)
master_fd, slave_fd = self.func()
master_fp, slave_fp = self.func()
try:
st = os.fstat(master_fd)
st = os.fstat(master_fp.fileno())
self.assertEquals(5, os.major(st.st_rdev))
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
flags = fcntl.fcntl(master_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
st = os.fstat(slave_fd)
st = os.fstat(slave_fp.fileno())
self.assertEquals(136, os.major(st.st_rdev))
flags = fcntl.fcntl(slave_fd, fcntl.F_GETFL)
flags = fcntl.fcntl(slave_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
finally:
os.close(master_fd)
os.close(slave_fd)
master_fp.close()
slave_fp.close()
class TtyCreateChildTest(testlib.TestCase):
@ -235,125 +221,21 @@ class TtyCreateChildTest(testlib.TestCase):
# read a password.
tf = tempfile.NamedTemporaryFile()
try:
pid, fd, _ = self.func([
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
deadline = time.time() + 5.0
for line in mitogen.parent.iter_read([fd], deadline):
self.assertEquals(mitogen.core.b('hi\n'), line)
break
waited_pid, status = os.waitpid(pid, 0)
self.assertEquals(pid, waited_pid)
mitogen.core.set_block(proc.stdio_fp.fileno())
self.assertEquals(mitogen.core.b('hi\n'), proc.stdio_fp.read())
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read())
os.close(fd)
proc.stdio_fp.close()
finally:
tf.close()
class IterReadTest(testlib.TestCase):
func = staticmethod(mitogen.parent.iter_read)
def make_proc(self):
# I produce text every 100ms.
args = [testlib.data_path('iter_read_generator.py')]
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
mitogen.core.set_nonblock(proc.stdout.fileno())
return proc
def test_no_deadline(self):
proc = self.make_proc()
try:
reader = self.func([proc.stdout.fileno()])
for i, chunk in enumerate(reader):
self.assertEqual(1+i, int(chunk))
if i > 2:
break
finally:
Popen__terminate(proc)
proc.stdout.close()
def test_deadline_exceeded_before_call(self):
proc = self.make_proc()
reader = self.func([proc.stdout.fileno()], 0)
try:
got = []
try:
for chunk in reader:
got.append(chunk)
assert 0, 'TimeoutError not raised'
except mitogen.core.TimeoutError:
self.assertEqual(len(got), 0)
finally:
Popen__terminate(proc)
proc.stdout.close()
def test_deadline_exceeded_during_call(self):
proc = self.make_proc()
deadline = time.time() + 0.4
reader = self.func([proc.stdout.fileno()], deadline)
try:
got = []
try:
for chunk in reader:
if time.time() > (deadline + 1.0):
assert 0, 'TimeoutError not raised'
got.append(chunk)
except mitogen.core.TimeoutError:
# Give a little wiggle room in case of imperfect scheduling.
# Ideal number should be 9.
self.assertLess(deadline, time.time())
self.assertLess(1, len(got))
self.assertLess(len(got), 20)
finally:
Popen__terminate(proc)
proc.stdout.close()
class WriteAllTest(testlib.TestCase):
func = staticmethod(mitogen.parent.write_all)
def make_proc(self):
args = [testlib.data_path('write_all_consumer.py')]
proc = subprocess.Popen(args, stdin=subprocess.PIPE)
mitogen.core.set_nonblock(proc.stdin.fileno())
return proc
ten_ms_chunk = (mitogen.core.b('x') * 65535)
def test_no_deadline(self):
proc = self.make_proc()
try:
self.func(proc.stdin.fileno(), self.ten_ms_chunk)
finally:
Popen__terminate(proc)
proc.stdin.close()
def test_deadline_exceeded_before_call(self):
proc = self.make_proc()
try:
self.assertRaises(mitogen.core.TimeoutError, (
lambda: self.func(proc.stdin.fileno(), self.ten_ms_chunk, 0)
))
finally:
Popen__terminate(proc)
proc.stdin.close()
def test_deadline_exceeded_during_call(self):
proc = self.make_proc()
try:
deadline = time.time() + 0.1 # 100ms deadline
self.assertRaises(mitogen.core.TimeoutError, (
lambda: self.func(proc.stdin.fileno(),
self.ten_ms_chunk * 100, # 1s of data
deadline)
))
finally:
Popen__terminate(proc)
proc.stdin.close()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is
@ -394,7 +276,7 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c2 = self.router.local()
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id
self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c1.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router)
@ -412,14 +294,14 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_far_sibling_disconnected(self):
# God mode: child of child notices child of child of parent has
# disconnected.
c1 = self.router.local()
c11 = self.router.local(via=c1)
c1 = self.router.local(name='c1')
c11 = self.router.local(name='c11', via=c1)
c2 = self.router.local()
c22 = self.router.local(via=c2)
c2 = self.router.local(name='c2')
c22 = self.router.local(name='c22', via=c2)
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id
self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c11.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router)

@ -42,8 +42,8 @@ class SockMixin(object):
self.l2_sock, self.r2_sock = socket.socketpair()
self.l2 = self.l2_sock.fileno()
self.r2 = self.r2_sock.fileno()
for fd in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fd)
for fp in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fp)
def fill(self, fd):
"""Make `fd` unwriteable."""
@ -354,17 +354,17 @@ class FileClosedMixin(PollerMixin, SockMixin):
class TtyHangupMixin(PollerMixin):
def test_tty_hangup_detected(self):
# bug in initial select.poll() implementation failed to detect POLLHUP.
master_fd, slave_fd = mitogen.parent.openpty()
master_fp, slave_fp = mitogen.parent.openpty()
try:
self.p.start_receive(master_fd)
self.p.start_receive(master_fp.fileno())
self.assertEquals([], list(self.p.poll(0)))
os.close(slave_fd)
slave_fd = None
self.assertEquals([master_fd], list(self.p.poll(0)))
slave_fp.close()
slave_fp = None
self.assertEquals([master_fp.fileno()], list(self.p.poll(0)))
finally:
if slave_fd is not None:
os.close(slave_fd)
os.close(master_fd)
if slave_fp is not None:
slave_fp.close()
master_fp.close()
class DistinctDataMixin(PollerMixin, SockMixin):

@ -105,7 +105,7 @@ class BrokenModulesTest(testlib.TestCase):
# unavailable. Should never happen in the real world.
stream = mock.Mock()
stream.sent_modules = set()
stream.protocol.sent_modules = set()
router = mock.Mock()
router.stream_by_id = lambda n: stream
@ -143,7 +143,7 @@ class BrokenModulesTest(testlib.TestCase):
import six_brokenpkg
stream = mock.Mock()
stream.sent_modules = set()
stream.protocol.sent_modules = set()
router = mock.Mock()
router.stream_by_id = lambda n: stream

@ -171,7 +171,7 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
self.assertTrue(sem.get().is_dead)
# Ensure it was logged.
expect = '_broker_main() crashed'
expect = 'broker crashed'
self.assertTrue(expect in log.stop())
self.broker.join()
@ -364,8 +364,8 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
# treated like a parent.
l1 = self.router.local()
l1s = self.router.stream_by_id(l1.context_id)
l1s.auth_id = mitogen.context_id
l1s.is_privileged = True
l1s.protocol.auth_id = mitogen.context_id
l1s.protocol.is_privileged = True
l2 = self.router.local()
e = self.assertRaises(mitogen.core.CallError,
@ -378,12 +378,21 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
def test_egress_ids_populated(self):
# Ensure Stream.egress_ids is populated on message reception.
c1 = self.router.local()
stream = self.router.stream_by_id(c1.context_id)
self.assertEquals(set(), stream.egress_ids)
c1 = self.router.local(name='c1')
c2 = self.router.local(name='c2')
c1.call(time.sleep, 0)
self.assertEquals(set([mitogen.context_id]), stream.egress_ids)
c1s = self.router.stream_by_id(c1.context_id)
try:
c1.call(ping_context, c2)
except mitogen.core.CallError:
# Fails because siblings cant call funcs in each other, but this
# causes messages to be sent.
pass
self.assertEquals(c1s.protocol.egress_ids, set([
mitogen.context_id,
c2.context_id,
]))
if __name__ == '__main__':

@ -44,8 +44,8 @@ class ActivationTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue(isinstance(id_, int))
def test_sibling_cannot_activate_framework(self):
l1 = self.router.local()
l2 = self.router.local()
l1 = self.router.local(name='l1')
l2 = self.router.local(name='l2')
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0])

@ -42,8 +42,6 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
class SshTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def test_debug_decoding(self):
# ensure filter_debug_logs() decodes the logged string.
capture = testlib.LogCapturer()
@ -93,7 +91,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_password_incorrect(self):
try:
@ -105,7 +103,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_incorrect_msg)
self.assertEqual(e.args[0], mitogen.ssh.password_incorrect_msg)
def test_password_specified(self):
context = self.docker_ssh(
@ -127,7 +125,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError:
e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_required_msg)
self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_pubkey_specified(self):
context = self.docker_ssh(
@ -150,7 +148,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
check_host_keys='enforce',
)
)
self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg)
self.assertEquals(e.args[0], mitogen.ssh.hostkey_failed_msg)
finally:
fp.close()
@ -184,8 +182,6 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
class BannerTest(testlib.DockerMixin, testlib.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's
# login banner from a legitimate password prompt.
stream_class = mitogen.ssh.Stream
def test_verbose_enabled(self):
context = self.docker_ssh(
username='mitogen__has_sudo',
@ -210,8 +206,6 @@ class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept
context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')

@ -1,33 +0,0 @@
import unittest2
import mock
import mitogen.core
import testlib
class ReceiveOneTest(testlib.TestCase):
klass = mitogen.core.Stream
def test_corruption(self):
broker = mock.Mock()
router = mock.Mock()
stream = self.klass(router, 1)
junk = mitogen.core.b('x') * stream.HEADER_LEN
stream._input_buf = [junk]
stream._input_buf_len = len(junk)
capture = testlib.LogCapturer()
capture.start()
ret = stream._receive_one(broker)
#self.assertEquals(1, broker.stop_receive.mock_calls)
capture.stop()
self.assertFalse(ret)
self.assertTrue((self.klass.corrupt_msg % (junk,)) in capture.raw())
if __name__ == '__main__':
unittest2.main()

@ -1,9 +1,9 @@
import getpass
import os
import mitogen
import mitogen.lxd
import mitogen.parent
import mitogen.su
import unittest2
@ -21,12 +21,41 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV'))
return context, argv
def test_basic(self):
context, argv = self.run_su()
self.assertEquals(argv[1], 'root')
self.assertEquals(argv[2], '-c')
class SuTest(testlib.DockerMixin, testlib.TestCase):
def test_password_required(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.su(via=ssh)
)
self.assertTrue(mitogen.su.password_required_msg in str(e))
def test_password_incorrect(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.su(via=ssh, password='x')
)
self.assertTrue(mitogen.su.password_incorrect_msg in str(e))
def test_password_okay(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
context = self.router.su(via=ssh, password='rootpassword')
self.assertEquals('root', context.call(getpass.getuser))
if __name__ == '__main__':
unittest2.main()

@ -2,8 +2,7 @@
import os
import mitogen
import mitogen.lxd
import mitogen.parent
import mitogen.sudo
import unittest2
@ -79,7 +78,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh)
)
self.assertTrue(mitogen.sudo.Stream.password_required_msg in str(e))
self.assertTrue(mitogen.sudo.password_required_msg in str(e))
def test_password_incorrect(self):
ssh = self.docker_ssh(
@ -91,7 +90,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh, password='x')
)
self.assertTrue(mitogen.sudo.Stream.password_incorrect_msg in str(e))
self.assertTrue(mitogen.sudo.password_incorrect_msg in str(e))
def test_password_okay(self):
ssh = self.docker_ssh(
@ -103,7 +102,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh, password='rootpassword')
)
self.assertTrue(mitogen.sudo.Stream.password_incorrect_msg in str(e))
self.assertTrue(mitogen.sudo.password_incorrect_msg in str(e))
if __name__ == '__main__':

@ -67,12 +67,12 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.unix.Listener
def test_constructor_basic(self):
listener = self.klass(router=self.router)
listener = self.klass.build_stream(router=self.router)
capture = testlib.LogCapturer()
capture.start()
try:
self.assertFalse(mitogen.unix.is_path_dead(listener.path))
os.unlink(listener.path)
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)
# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
@ -96,12 +96,14 @@ class ClientTest(testlib.TestCase):
def _test_simple_client(self, path):
router, context = self._try_connect(path)
try:
self.assertEquals(0, context.context_id)
self.assertEquals(1, mitogen.context_id)
self.assertEquals(0, mitogen.parent_id)
resp = context.call_service(service_name=MyService, method_name='ping')
self.assertEquals(mitogen.context_id, resp['src_id'])
self.assertEquals(0, resp['auth_id'])
finally:
router.broker.shutdown()
router.broker.join()
os.unlink(path)
@ -112,7 +114,7 @@ class ClientTest(testlib.TestCase):
latch = mitogen.core.Latch()
try:
try:
listener = cls.klass(path=path, router=router)
listener = cls.klass.build_stream(path=path, router=router)
pool = mitogen.service.Pool(router=router, services=[
MyService(latch=latch, router=router),
])

Loading…
Cancel
Save