Refactor Stream, introduce quasi-asynchronous connect, much more

Split Stream into many, many classes

  * mitogen.parent.Connection: Handles connection setup logic only.
    * Maintain references to stdout and stderr streams.
    * Manages TimerList timer to cancel connection attempt after
      deadline
    * Blocking setup code replaced by async equivalents running on the
      broker

  * mitogen.parent.Options: Tracks connection-specific options. This
    keeps the connection class small, but more importantly, it is
    generic to the future desire to build and execute command lines
    without starting a full connection.

  * mitogen.core.Protocol: Handles program behaviour relating to events
    on a stream. Protocol performs no IO of its own, instead deferring
    it to Stream and Side. This makes testing much easier, and means
    libssh can reimplement Stream and Side to reuse MitogenProtocol

  * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream
    implementtion

  * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered
    transmit implementation, made generic

  * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how
    to split up input and pass it on to a
    on_line_received()/on_partial_line_received() callback.

  * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old
    blocking connect code. Waits for various prompts (MITO001 etc) and
    writes the bootstrap using a BufferedWriter. On success, switches
    the stream to MitogenProtocol.

  * mitogen.core.Message: move encoding parts of MitogenProtocol out to
    Message (where it belongs) and write a bunch of new tests for
    pickling.

  * The bizarre Stream.construct() is gone now, Option.__init__ is its
    own constructor. Should fix many LGTM errors.

* Update all connection methods:  Every connection method is updated to
  use async logic, defining protocols as required to handle interactive
  prompts like in SSH or su. Add new real integration tests for at least
  doas and su.

* Eliminate manual fd management: File descriptors are trapped in file
  objects at their point of origin, and Side is updated to use file
  objects rather than raw descriptors. This eliminates a whole class of
  bugs where unrelated FDs could be closed by the wrong component. Now
  an FD's open/closed status is fused to it everywhere in the library.

* Halve file descriptor usage: now FD open/close state is tracked by
  its file object, we don't need to duplicate FDs everywhere so that
  receive/transmit side can be closed independently. Instead both sides
  back on to the same file object. Closes #26, Closes #470.

* Remove most uses of dup/dup2: Closes #256. File descriptors are
  trapped in a common file object and shared among classes. The
  remaining few uses for dup/dup2 are as close to minimal as possible.

* Introduce mitogen.parent.Process: uniform interface for subprocesses
  created either via mitogen.fork or the subprocess module. Remove all
  the crap where we steal a pid from subprocess guts. Now we use
  subprocess to manage its processes as it should be. Closes #169 by
  using the new Timers facility to poll for a slow-to-exit subprocess.

* Fix su password race: Closes #363. DelineatedProtocol naturally
  retries partially received lines, preventing the cause of the original
  race.

* Delete old blocking IO utility functions
  iter_read()/write_all()/discard_until().

Closes #26
Closes #147
Closes #169
Closes #256
Closes #363
Closes #419
Closes #470
pull/607/head
David Wilson 6 years ago
parent 37beb3a5c5
commit 8d1b01d8ef

@ -1374,10 +1374,12 @@ class Importer(object):
if not present: if not present:
funcs = self._callbacks.get(fullname) funcs = self._callbacks.get(fullname)
if funcs is not None: if funcs is not None:
_v and LOG.debug('_request_module(%r): in flight', fullname) _v and LOG.debug('%s: existing request for %s in flight',
self, fullname)
funcs.append(callback) funcs.append(callback)
else: else:
_v and LOG.debug('_request_module(%r): new request', fullname) _v and LOG.debug('%s: requesting %s from parent',
self, fullname)
self._callbacks[fullname] = [callback] self._callbacks[fullname] = [callback]
self._context.send( self._context.send(
Message(data=b(fullname), handle=GET_MODULE) Message(data=b(fullname), handle=GET_MODULE)
@ -1493,6 +1495,80 @@ class LogHandler(logging.Handler):
self.local.in_emit = False self.local.in_emit = False
class Stream(object):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
#: A :class:`Protocol` representing the protocol active on the stream.
protocol = None
#: In parents, the :class:`mitogen.parent.Connection` instance.
conn = None
name = u'default'
def set_protocol(self, protocol):
self.protocol = protocol
self.protocol.stream = self
def accept(self, rfp, wfp):
self.receive_side = Side(self, rfp)
self.transmit_side = Side(self, wfp)
def __repr__(self):
return "<Stream %s>" % (self.name,)
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
buf = self.receive_side.read(self.protocol.read_size)
if not buf:
LOG.debug('%r: empty read, disconnecting', self)
return self.on_disconnect(broker)
self.protocol.on_receive(broker, buf)
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
self.protocol.on_transmit(broker)
def on_shutdown(self, broker):
"""
Called by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:meth:`on_disconnect`.
"""
fire(self, 'shutdown')
self.protocol.on_shutdown(broker)
def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
fire(self, 'disconnect')
self.protocol.on_disconnect(broker)
class Protocol(object): class Protocol(object):
""" """
Implement the program behaviour associated with activity on a Implement the program behaviour associated with activity on a
@ -1506,11 +1582,13 @@ class Protocol(object):
provided by :class:`Stream` and :class:`Side`, allowing the underlying IO provided by :class:`Stream` and :class:`Side`, allowing the underlying IO
implementation to be replaced without modifying behavioural logic. implementation to be replaced without modifying behavioural logic.
""" """
stream_class = Stream
stream = None stream = None
read_size = CHUNK_SIZE
@classmethod @classmethod
def build_stream(cls, *args, **kwargs): def build_stream(cls, *args, **kwargs):
stream = Stream() stream = cls.stream_class()
stream.set_protocol(cls(*args, **kwargs)) stream.set_protocol(cls(*args, **kwargs))
return stream return stream
@ -1547,24 +1625,30 @@ class DelimitedProtocol(Protocol):
increasingly complete message. When a complete message is finally received, increasingly complete message. When a complete message is finally received,
:meth:`on_line_received` will be called once for it before the buffer is :meth:`on_line_received` will be called once for it before the buffer is
discarded. discarded.
If :func:`on_line_received` returns :data:`False`, remaining data is passed
unprocessed to the stream's current protocol's :meth:`on_receive`. This
allows switching from line-oriented to binary while the input buffer
contains both kinds of data.
""" """
#: The delimiter. Defaults to newline. #: The delimiter. Defaults to newline.
delimiter = b('\n') delimiter = b('\n')
_trailer = b('') _trailer = b('')
def on_receive(self, broker): def on_receive(self, broker, buf):
IOLOG.debug('%r.on_receive()', self) IOLOG.debug('%r.on_receive()', self)
buf = self.stream.receive_side.read() self._trailer, cont = mitogen.core.iter_split(
if not buf:
return self.stream.on_disconnect(broker)
self._trailer = mitogen.core.iter_split(
buf=self._trailer + buf, buf=self._trailer + buf,
delim=self.delimiter, delim=self.delimiter,
func=self.on_line_received, func=self.on_line_received,
) )
if self._trailer: if self._trailer:
if cont:
self.on_partial_line_received(self._trailer) self.on_partial_line_received(self._trailer)
else:
assert self.stream.protocol is not self
self.stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line): def on_line_received(self, line):
pass pass
@ -1656,23 +1740,24 @@ class Side(object):
enabled using :func:`fcntl.fcntl`. enabled using :func:`fcntl.fcntl`.
""" """
_fork_refs = weakref.WeakValueDictionary() _fork_refs = weakref.WeakValueDictionary()
closed = False
def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
#: The :class:`Stream` for which this is a read or write side. #: The :class:`Stream` for which this is a read or write side.
self.stream = stream self.stream = stream
#: Integer file descriptor to perform IO on, or :data:`None` if #: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called. #: :meth:`close` has been called.
self.fd = fd self.fp = fp
self.closed = False self.fd = fp.fileno()
#: If :data:`True`, causes presence of this side in #: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the #: :class:`Broker`'s active reader set to defer shutdown until the
#: side is disconnected. #: side is disconnected.
self.keep_alive = keep_alive self.keep_alive = keep_alive
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
set_cloexec(fd) set_cloexec(self.fd)
if not blocking: if not blocking:
set_nonblock(fd) set_nonblock(self.fd)
def __repr__(self): def __repr__(self):
return '<Side of %r fd %s>' % (self.stream, self.fd) return '<Side of %r fd %s>' % (self.stream, self.fd)
@ -1692,7 +1777,7 @@ class Side(object):
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
if not self.closed: if not self.closed:
self.closed = True self.closed = True
os.close(self.fd) self.fp.close()
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
""" """
@ -1728,9 +1813,8 @@ class Side(object):
Number of bytes written, or :data:`None` if disconnection was Number of bytes written, or :data:`None` if disconnection was
detected. detected.
""" """
if self.closed or self.fd is None: if self.closed:
# Refuse to touch the handle after closed, it may have been reused # Don't touch the handle after close, it may be reused elsewhere.
# by another thread.
return None return None
written, disconnected = io_op(os.write, self.fd, s) written, disconnected = io_op(os.write, self.fd, s)
@ -1740,67 +1824,10 @@ class Side(object):
return written return written
class BasicStream(object): class MitogenProtocol(Protocol):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
pass
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
pass
def on_shutdown(self, broker):
""" """
Called by :meth:`Broker.shutdown` to allow the stream time to :class:`Protocol` implementing mitogen's :ref:`stream protocol
gracefully shutdown. The base implementation simply called <stream-protocol>`.
:meth:`on_disconnect`.
"""
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
LOG.debug('%r.on_disconnect()', self)
if self.receive_side:
broker.stop_receive(self)
self.receive_side.close()
if self.transmit_side:
broker._stop_transmit(self)
self.transmit_side.close()
fire(self, 'disconnect')
class Stream(BasicStream):
"""
:class:`BasicStream` subclass implementing mitogen's :ref:`stream
protocol <stream-protocol>`.
""" """
#: If not :data:`None`, :class:`Router` stamps this into #: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream. #: :attr:`Message.auth_id` of every message received on this stream.
@ -1811,24 +1838,24 @@ class Stream(BasicStream):
#: :data:`mitogen.parent_ids`. #: :data:`mitogen.parent_ids`.
is_privileged = False is_privileged = False
def __init__(self, router, remote_id, **kwargs): def __init__(self, router, remote_id):
self._router = router self._router = router
self.remote_id = remote_id self.remote_id = remote_id
self.name = u'default'
self.sent_modules = set(['mitogen', 'mitogen.core']) self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs)
self._input_buf = collections.deque() self._input_buf = collections.deque()
self._output_buf = collections.deque()
self._input_buf_len = 0 self._input_buf_len = 0
self._output_buf_len = 0 self._writer = BufferedWriter(router.broker, self)
#: Routing records the dst_id of every message arriving from this #: Routing records the dst_id of every message arriving from this
#: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID. #: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
self.egress_ids = set() self.egress_ids = set()
def construct(self): def on_receive(self, broker, buf):
pass """
Handle the next complete message on the stream. Raise
def _internal_receive(self, broker, buf): :class:`StreamError` on failure.
"""
_vv and IOLOG.debug('%r.on_receive()', self)
if self._input_buf and self._input_buf_len < 128: if self._input_buf and self._input_buf_len < 128:
self._input_buf[0] += buf self._input_buf[0] += buf
else: else:
@ -1838,60 +1865,45 @@ class Stream(BasicStream):
while self._receive_one(broker): while self._receive_one(broker):
pass pass
def on_receive(self, broker):
"""Handle the next complete message on the stream. Raise
:class:`StreamError` on failure."""
_vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
self._internal_receive(broker, buf)
HEADER_FMT = '>hLLLLLL'
HEADER_LEN = struct.calcsize(HEADER_FMT)
HEADER_MAGIC = 0x4d49 # 'MI'
corrupt_msg = ( corrupt_msg = (
'Corruption detected: frame signature incorrect. This likely means ' '%s: Corruption detected: frame signature incorrect. This likely means'
'some external process is interfering with the connection. Received:' ' some external process is interfering with the connection. Received:'
'\n\n' '\n\n'
'%r' '%r'
) )
def _receive_one(self, broker): def _receive_one(self, broker):
if self._input_buf_len < self.HEADER_LEN: if self._input_buf_len < Message.HEADER_LEN:
return False return False
msg = Message() msg = Message()
msg.router = self._router msg.router = self._router
(magic, msg.dst_id, msg.src_id, msg.auth_id, (magic, msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to, msg_len) = struct.unpack( msg.handle, msg.reply_to, msg_len) = struct.unpack(
self.HEADER_FMT, Message.HEADER_FMT,
self._input_buf[0][:self.HEADER_LEN], self._input_buf[0][:Message.HEADER_LEN],
) )
if magic != self.HEADER_MAGIC: if magic != Message.HEADER_MAGIC:
LOG.error(self.corrupt_msg, self._input_buf[0][:2048]) LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
self.on_disconnect(broker) self.stream.on_disconnect(broker)
return False return False
if msg_len > self._router.max_message_size: if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)', LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size) msg_len, self._router.max_message_size)
self.on_disconnect(broker) self.stream.on_disconnect(broker)
return False return False
total_len = msg_len + self.HEADER_LEN total_len = msg_len + Message.HEADER_LEN
if self._input_buf_len < total_len: if self._input_buf_len < total_len:
_vv and IOLOG.debug( _vv and IOLOG.debug(
'%r: Input too short (want %d, got %d)', '%r: Input too short (want %d, got %d)',
self, msg_len, self._input_buf_len - self.HEADER_LEN self, msg_len, self._input_buf_len - Message.HEADER_LEN
) )
return False return False
start = self.HEADER_LEN start = Message.HEADER_LEN
prev_start = start prev_start = start
remain = total_len remain = total_len
bits = [] bits = []
@ -1906,7 +1918,7 @@ class Stream(BasicStream):
msg.data = b('').join(bits) msg.data = b('').join(bits)
self._input_buf.appendleft(buf[prev_start+len(bit):]) self._input_buf.appendleft(buf[prev_start+len(bit):])
self._input_buf_len -= total_len self._input_buf_len -= total_len
self._router._async_route(msg, self) self._router._async_route(msg, self.stream)
return True return True
def pending_bytes(self): def pending_bytes(self):
@ -1918,50 +1930,16 @@ class Stream(BasicStream):
For an accurate result, this method should be called from the Broker For an accurate result, this method should be called from the Broker
thread, for example by using :meth:`Broker.defer_sync`. thread, for example by using :meth:`Broker.defer_sync`.
""" """
return self._output_buf_len return self._writer._len
def on_transmit(self, broker): def on_transmit(self, broker):
"""Transmit buffered messages.""" """Transmit buffered messages."""
_vv and IOLOG.debug('%r.on_transmit()', self) _vv and IOLOG.debug('%r.on_transmit()', self)
self._writer.on_transmit(broker)
if self._output_buf:
buf = self._output_buf.popleft()
written = self.transmit_side.write(buf)
if not written:
_v and LOG.debug('%r.on_transmit(): disconnection detected', self)
self.on_disconnect(broker)
return
elif written != len(buf):
self._output_buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
if not self._output_buf:
broker._stop_transmit(self)
def _send(self, msg): def _send(self, msg):
_vv and IOLOG.debug('%r._send(%r)', self, msg) _vv and IOLOG.debug('%r._send(%r)', self, msg)
pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id, self._writer.write(msg.pack())
msg.src_id, msg.auth_id, msg.handle,
msg.reply_to or 0, len(msg.data)) + msg.data
if not self._output_buf_len:
# Modifying epoll/Kqueue state is expensive, as are needless broker
# loops. Rather than wait for writeability, just write immediately,
# and fall back to the broker loop on error or full buffer.
try:
n = self.transmit_side.write(pkt)
if n:
if n == len(pkt):
return
pkt = pkt[n:]
except OSError:
pass
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
self._output_buf_len += len(pkt)
def send(self, msg): def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May """Send `data` to `handle`, and tell the broker we have output. May
@ -1969,17 +1947,8 @@ class Stream(BasicStream):
self._router.broker.defer(self._send, msg) self._router.broker.defer(self._send, msg)
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Override BasicStream behaviour of immediately disconnecting.""" """Disable :class:`Protocol` immediate disconnect behaviour."""
_v and LOG.debug('%r.on_shutdown(%r)', self, broker) _v and LOG.debug('%r: shutting down', self)
def accept(self, rfd, wfd):
# TODO: what is this os.dup for?
self.receive_side = Side(self, os.dup(rfd))
self.transmit_side = Side(self, os.dup(wfd))
def __repr__(self):
cls = type(self)
return "%s.%s('%s')" % (cls.__module__, cls.__name__, self.name)
class Context(object): class Context(object):
@ -2005,18 +1974,17 @@ class Context(object):
:param str name: :param str name:
Context name. Context name.
""" """
name = None
remote_name = None remote_name = None
def __init__(self, router, context_id, name=None): def __init__(self, router, context_id, name=None):
self.router = router self.router = router
self.context_id = context_id self.context_id = context_id
self.name = name if name:
self.name = to_text(name)
def __reduce__(self): def __reduce__(self):
name = self.name return _unpickle_context, (self.context_id, self.name)
if name and not isinstance(name, UnicodeType):
name = UnicodeType(name, 'utf-8')
return _unpickle_context, (self.context_id, name)
def on_disconnect(self): def on_disconnect(self):
_v and LOG.debug('%r: disconnecting', self) _v and LOG.debug('%r: disconnecting', self)
@ -2161,7 +2129,7 @@ class Poller(object):
self._wfds = {} self._wfds = {}
def __repr__(self): def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self)) return '%s' % (type(self).__name__,)
def _update(self, fd): def _update(self, fd):
""" """
@ -2509,7 +2477,7 @@ class Latch(object):
) )
class Waker(BasicStream): class Waker(Protocol):
""" """
:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state Used to wake the multiplexer when another thread needs to modify its state
@ -2517,17 +2485,20 @@ class Waker(BasicStream):
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
""" """
read_size = 1
broker_ident = None broker_ident = None
@classmethod
def build_stream(cls, broker):
stream = super(Waker, cls).build_stream(broker)
stream.accept(*pipe())
return stream
def __init__(self, broker): def __init__(self, broker):
self._broker = broker self._broker = broker
self._lock = threading.Lock() self._lock = threading.Lock()
self._deferred = [] self._deferred = []
rfd, wfd = os.pipe()
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def __repr__(self): def __repr__(self):
return 'Waker(fd=%r/%r)' % ( return 'Waker(fd=%r/%r)' % (
self.stream.receive_side and self.stream.receive_side.fd, self.stream.receive_side and self.stream.receive_side.fd,
@ -2545,7 +2516,7 @@ class Waker(BasicStream):
finally: finally:
self._lock.release() self._lock.release()
def on_receive(self, broker): def on_receive(self, broker, buf):
""" """
Drain the pipe and fire callbacks. Since :attr:`_deferred` is Drain the pipe and fire callbacks. Since :attr:`_deferred` is
synchronized, :meth:`defer` and :meth:`on_receive` can conspire to synchronized, :meth:`defer` and :meth:`on_receive` can conspire to
@ -2554,7 +2525,6 @@ class Waker(BasicStream):
_vv and IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
self._lock.acquire() self._lock.acquire()
try: try:
self.receive_side.read(1)
deferred = self._deferred deferred = self._deferred
self._deferred = [] self._deferred = []
finally: finally:
@ -2566,7 +2536,7 @@ class Waker(BasicStream):
except Exception: except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)', LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs) func, args, kwargs)
self._broker.shutdown() broker.shutdown()
def _wake(self): def _wake(self):
""" """
@ -2574,7 +2544,7 @@ class Waker(BasicStream):
teardown, the FD may already be closed, so ignore EBADF. teardown, the FD may already be closed, so ignore EBADF.
""" """
try: try:
self.transmit_side.write(b(' ')) self.stream.transmit_side.write(b(' '))
except OSError: except OSError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
if e.args[0] != errno.EBADF: if e.args[0] != errno.EBADF:
@ -2601,7 +2571,8 @@ class Waker(BasicStream):
if self._broker._exitted: if self._broker._exitted:
raise Error(self.broker_shutdown_msg) raise Error(self.broker_shutdown_msg)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) _vv and IOLOG.debug('%r.defer() [fd=%r]', self,
self.stream.transmit_side.fd)
self._lock.acquire() self._lock.acquire()
try: try:
if not self._deferred: if not self._deferred:
@ -2611,53 +2582,45 @@ class Waker(BasicStream):
self._lock.release() self._lock.release()
class IoLogger(BasicStream): class IoLoggerProtocol(DelimitedProtocol):
""" """
:class:`BasicStream` subclass that sets up redirection of a standard Handle redirection of standard IO into the :mod:`logging` package.
UNIX file descriptor back into the Python :mod:`logging` package.
""" """
_trailer = u'' @classmethod
def build_stream(cls, name, dest_fd):
def __init__(self, broker, name, dest_fd): """
self._broker = broker Even though the descriptor `dest_fd` will hold the opposite end of the
self._name = name socket open, we must keep a separate dup() of it (i.e. wsock) in case
self._rsock, self._wsock = socket.socketpair() some code decides to overwrite `dest_fd` later, which would thus break
os.dup2(self._wsock.fileno(), dest_fd) :meth:`on_shutdown`.
set_cloexec(self._wsock.fileno()) """
rsock, wsock = socket.socketpair()
os.dup2(wsock.fileno(), dest_fd)
stream = super(IoLoggerProtocol, cls).build_stream(name)
stream.name = name
stream.accept(rsock, wsock)
return stream
def __init__(self, name):
self._log = logging.getLogger(name) self._log = logging.getLogger(name)
# #453: prevent accidental log initialization in a child creating a # #453: prevent accidental log initialization in a child creating a
# feedback loop. # feedback loop.
self._log.propagate = False self._log.propagate = False
self._log.handlers = logging.getLogger().handlers[:] self._log.handlers = logging.getLogger().handlers[:]
self.receive_side = Side(self, self._rsock.fileno())
self.transmit_side = Side(self, dest_fd, cloexec=False, blocking=True)
self._broker.start_receive(self)
def __repr__(self):
return '<IoLogger %s>' % (self._name,)
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Shut down the write end of the logging socket.""" """Shut down the write end of the logging socket."""
_v and LOG.debug('%r: shutting down', self) _v and LOG.debug('%r: shutting down', self)
if not IS_WSL: if not IS_WSL:
# #333: WSL generates invalid readiness indication on shutdown() # #333: WSL generates invalid readiness indication on shutdown().
self._wsock.shutdown(socket.SHUT_WR) # This modifies the *kernel object* inherited by children, causing
self._wsock.close() # EPIPE on subsequent writes to any dupped FD in any process. The
self.transmit_side.close() # read side can then drain completely of prior buffered data.
self.stream.transmit_side.fp.shutdown(socket.SHUT_WR)
def on_receive(self, broker): self.stream.transmit_side.close()
_vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
self._trailer = iter_split( def on_line_received(self, line):
buf=self._trailer + buf.decode('latin1'), self._log.info('%s', line.decode('utf-8', 'replace'))
delim='\n',
func=lambda s: self._log.info('%s', s)
)
class Router(object): class Router(object):
@ -3008,11 +2971,11 @@ class Router(object):
self, msg.src_id, in_stream, expect, msg) self, msg.src_id, in_stream, expect, msg)
return return
if in_stream.auth_id is not None: if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.auth_id msg.auth_id = in_stream.protocol.auth_id
# Maintain a set of IDs the source ever communicated with. # Maintain a set of IDs the source ever communicated with.
in_stream.egress_ids.add(msg.dst_id) in_stream.protocol.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id: if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream) return self._invoke(msg, in_stream)
@ -3027,12 +2990,13 @@ class Router(object):
return return
if in_stream and self.unidirectional and not \ if in_stream and self.unidirectional and not \
(in_stream.is_privileged or out_stream.is_privileged): (in_stream.protocol.is_privileged or
out_stream.protocol.is_privileged):
self._maybe_send_dead(msg, self.unidirectional_msg, self._maybe_send_dead(msg, self.unidirectional_msg,
in_stream.remote_id, out_stream.remote_id) in_stream.protocol.remote_id, out_stream.protocol.remote_id)
return return
out_stream._send(msg) out_stream.protocol._send(msg)
def route(self, msg): def route(self, msg):
""" """
@ -3074,11 +3038,11 @@ class Broker(object):
def __init__(self, poller_class=None, activate_compat=True): def __init__(self, poller_class=None, activate_compat=True):
self._alive = True self._alive = True
self._exitted = False self._exitted = False
self._waker = Waker(self) self._waker = Waker.build_stream(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread. #: thread, or immediately if the current thread is the broker thread.
#: Safe to call from any thread. #: Safe to call from any thread.
self.defer = self._waker.defer self.defer = self._waker.protocol.defer
self.poller = self.poller_class() self.poller = self.poller_class()
self.poller.start_receive( self.poller.start_receive(
self._waker.receive_side.fd, self._waker.receive_side.fd,
@ -3112,7 +3076,7 @@ class Broker(object):
""" """
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side side = stream.receive_side
assert side and side.fd is not None assert side and not side.closed
self.defer(self.poller.start_receive, self.defer(self.poller.start_receive,
side.fd, (side, stream.on_receive)) side.fd, (side, stream.on_receive))
@ -3133,7 +3097,7 @@ class Broker(object):
""" """
_vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
side = stream.transmit_side side = stream.transmit_side
assert side and side.fd is not None assert side and not side.closed
self.poller.start_transmit(side.fd, (side, stream.on_transmit)) self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream): def _stop_transmit(self, stream):
@ -3246,7 +3210,7 @@ class Broker(object):
:meth:`shutdown` is called. :meth:`shutdown` is called.
""" """
# For Python 2.4, no way to retrieve ident except on thread. # For Python 2.4, no way to retrieve ident except on thread.
self._waker.broker_ident = thread.get_ident() self._waker.protocol.broker_ident = thread.get_ident()
try: try:
while self._alive: while self._alive:
self._loop_once() self._loop_once()
@ -3486,18 +3450,16 @@ class ExternalContext(object):
else: else:
self.parent = Context(self.router, parent_id, 'parent') self.parent = Context(self.router, parent_id, 'parent')
in_fd = self.config.get('in_fd', 100) in_fp = os.fdopen(os.dup(self.config.get('in_fd', 100)), 'rb', 0)
out_fd = self.config.get('out_fd', 1) out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
self.stream = Stream(self.router, parent_id) self.stream = MitogenProtocol.build_stream(self.router, parent_id)
self.stream.accept(in_fp, out_fp)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd)
self.stream.receive_side.keep_alive = False self.stream.receive_side.keep_alive = False
listen(self.stream, 'disconnect', self._on_parent_disconnect) listen(self.stream, 'disconnect', self._on_parent_disconnect)
listen(self.broker, 'exit', self._on_broker_exit) listen(self.broker, 'exit', self._on_broker_exit)
os.close(in_fd)
def _reap_first_stage(self): def _reap_first_stage(self):
try: try:
os.wait() # Reap first stage. os.wait() # Reap first stage.
@ -3584,7 +3546,7 @@ class ExternalContext(object):
try: try:
if os.isatty(2): if os.isatty(2):
self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0) self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0)
set_cloexec(self.reserve_tty_fp) set_cloexec(self.reserve_tty_fp.fileno())
except OSError: except OSError:
pass pass
@ -3600,8 +3562,12 @@ class ExternalContext(object):
sys.stdout.close() sys.stdout.close()
self._nullify_stdio() self._nullify_stdio()
self.stdout_log = IoLogger(self.broker, 'stdout', 1) self.loggers = []
self.stderr_log = IoLogger(self.broker, 'stderr', 2) for name, fd in (('stdout', 1), ('stderr', 2)):
log = IoLoggerProtocol.build_stream(name, fd)
self.broker.start_receive(log)
self.loggers.append(log)
# Reopen with line buffering. # Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1) sys.stdout = os.fdopen(1, 'w', 1)
@ -3621,11 +3587,11 @@ class ExternalContext(object):
self.dispatcher = Dispatcher(self) self.dispatcher = Dispatcher(self)
self.router.register(self.parent, self.stream) self.router.register(self.parent, self.stream)
self.router._setup_logging() self.router._setup_logging()
self.log_handler.uncork()
sys.executable = os.environ.pop('ARGV0', sys.executable) sys.executable = os.environ.pop('ARGV0', sys.executable)
_v and LOG.debug('Connected to context %s; my ID is %r', _v and LOG.debug('Parent is context %r (%s); my ID is %r',
self.parent, mitogen.context_id) self.parent.context_id, self.parent.name,
mitogen.context_id)
_v and LOG.debug('pid:%r ppid:%r uid:%r/%r, gid:%r/%r host:%r', _v and LOG.debug('pid:%r ppid:%r uid:%r/%r, gid:%r/%r host:%r',
os.getpid(), os.getppid(), os.geteuid(), os.getpid(), os.getppid(), os.geteuid(),
os.getuid(), os.getegid(), os.getgid(), os.getuid(), os.getegid(), os.getgid(),
@ -3633,6 +3599,9 @@ class ExternalContext(object):
_v and LOG.debug('Recovered sys.executable: %r', sys.executable) _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
self.broker._py24_25_compat() self.broker._py24_25_compat()
if self.config.get('send_ec2', True):
self.stream.transmit_side.write(b('MITO002\n'))
self.log_handler.uncork()
self.dispatcher.run() self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit') _v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt: except KeyboardInterrupt:

@ -29,6 +29,7 @@
# !mitogen: minify_safe # !mitogen: minify_safe
import logging import logging
import re
import mitogen.core import mitogen.core
import mitogen.parent import mitogen.parent
@ -37,77 +38,106 @@ from mitogen.core import b
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required'
class PasswordError(mitogen.core.StreamError): class PasswordError(mitogen.core.StreamError):
pass pass
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) username = u'root'
child_is_immediate_subprocess = False
username = 'root'
password = None password = None
doas_path = 'doas' doas_path = 'doas'
password_prompt = b('Password:') password_prompt = u'Password:'
incorrect_prompts = ( incorrect_prompts = (
b('doas: authentication failed'), u'doas: authentication failed', # slicer69/doas
u'doas: Authorization failed', # openbsd/src
) )
def construct(self, username=None, password=None, doas_path=None, def __init__(self, username=None, password=None, doas_path=None,
password_prompt=None, incorrect_prompts=None, **kwargs): password_prompt=None, incorrect_prompts=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
if username is not None: if username is not None:
self.username = username self.username = mitogen.core.to_text(username)
if password is not None: if password is not None:
self.password = password self.password = mitogen.core.to_text(password)
if doas_path is not None: if doas_path is not None:
self.doas_path = doas_path self.doas_path = doas_path
if password_prompt is not None: if password_prompt is not None:
self.password_prompt = password_prompt.lower() self.password_prompt = mitogen.core.to_text(password_prompt)
if incorrect_prompts is not None: if incorrect_prompts is not None:
self.incorrect_prompts = map(str.lower, incorrect_prompts) self.incorrect_prompts = [
mitogen.core.to_text(p)
for p in incorrect_prompts
]
def _get_name(self):
return u'doas.' + mitogen.core.to_text(self.username)
def get_boot_command(self): class BootstrapProtocol(mitogen.parent.RegexProtocol):
bits = [self.doas_path, '-u', self.username, '--'] password_sent = False
bits = bits + super(Stream, self).get_boot_command()
LOG.debug('doas command line: %r', bits)
return bits
password_incorrect_msg = 'doas password is incorrect' def setup_patterns(self, conn):
password_required_msg = 'doas password is required' prompt_pattern = re.compile(
re.escape(conn.options.password_prompt).encode('utf-8'),
re.I
)
incorrect_prompt_pattern = re.compile(
u'|'.join(
re.escape(s)
for s in conn.options.incorrect_prompts
).encode('utf-8'),
re.I
)
def _connect_input_loop(self, it): self.PATTERNS = [
password_sent = False (incorrect_prompt_pattern, type(self)._on_incorrect_password),
for buf in it: ]
LOG.debug('%r: received %r', self, buf) self.PARTIAL_PATTERNS = [
if buf.endswith(self.EC0_MARKER): (prompt_pattern, type(self)._on_password_prompt),
self._ec0_received() ]
def _on_incorrect_password(self, line, match):
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
def _on_password_prompt(self, line, match):
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return return
if any(s in buf.lower() for s in self.incorrect_prompts):
if password_sent: if self.password_sent:
raise PasswordError(self.password_incorrect_msg) self.stream.conn._fail_connection(
elif self.password_prompt in buf.lower(): PasswordError(password_incorrect_msg)
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.diag_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
) )
password_sent = True return
raise mitogen.core.StreamError('bootstrap failed')
def _connect_bootstrap(self): LOG.debug('sending password')
it = mitogen.parent.iter_read( self.stream.transmit_side.write(
fds=[self.receive_side.fd, self.diag_stream.receive_side.fd], (self.stream.conn.options.password + '\n').encode('utf-8')
deadline=self.connect_deadline,
) )
try: self.password_sent = True
self._connect_input_loop(it)
finally:
it.close() class Connection(mitogen.parent.Connection):
options_class = Options
diag_protocol_class = BootstrapProtocol
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
def _get_name(self):
return u'doas.' + self.options.username
def diag_stream_factory(self):
stream = super(Connection, self).diag_stream_factory()
stream.protocol.setup_patterns(self)
return stream
def get_boot_command(self):
bits = [self.options.doas_path, '-u', self.options.username, '--']
return bits + super(Connection, self).get_boot_command()

@ -37,45 +37,47 @@ import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
child_is_immediate_subprocess = False
container = None container = None
image = None image = None
username = None username = None
docker_path = 'docker' docker_path = u'docker'
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def construct(self, container=None, image=None, def __init__(self, container=None, image=None, docker_path=None,
docker_path=None, username=None, username=None, **kwargs):
**kwargs): super(Options, self).__init__(**kwargs)
assert container or image assert container or image
super(Stream, self).construct(**kwargs)
if container: if container:
self.container = container self.container = mitogen.core.to_text(container)
if image: if image:
self.image = image self.image = mitogen.core.to_text(image)
if docker_path: if docker_path:
self.docker_path = docker_path self.docker_path = mitogen.core.to_text(docker_path)
if username: if username:
self.username = username self.username = mitogen.core.to_text(username)
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def _get_name(self): def _get_name(self):
return u'docker.' + (self.container or self.image) return u'docker.' + (self.options.container or self.options.image)
def get_boot_command(self): def get_boot_command(self):
args = ['--interactive'] args = ['--interactive']
if self.username: if self.options.username:
args += ['--user=' + self.username] args += ['--user=' + self.options.username]
bits = [self.docker_path] bits = [self.options.docker_path]
if self.container: if self.options.container:
bits += ['exec'] + args + [self.container] bits += ['exec'] + args + [self.options.container]
elif self.image: elif self.options.image:
bits += ['run'] + args + ['--rm', self.image] bits += ['run'] + args + ['--rm', self.options.image]
return bits + super(Stream, self).get_boot_command() return bits + super(Connection, self).get_boot_command()

@ -117,14 +117,12 @@ SSH_GETOPTS = (
_mitogen = None _mitogen = None
class IoPump(mitogen.core.BasicStream): class IoPump(mitogen.core.Protocol):
_output_buf = '' _output_buf = ''
_closed = False _closed = False
def __init__(self, broker, stdin_fd, stdout_fd): def __init__(self, broker):
self._broker = broker self._broker = broker
self.receive_side = mitogen.core.Side(self, stdout_fd)
self.transmit_side = mitogen.core.Side(self, stdin_fd)
def write(self, s): def write(self, s):
self._output_buf += s self._output_buf += s
@ -134,13 +132,13 @@ class IoPump(mitogen.core.BasicStream):
self._closed = True self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is # If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit. # drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fd is not None: if self.transmit_side.fp.fileno() is not None:
self._broker._start_transmit(self) self._broker._start_transmit(self)
def on_shutdown(self, broker): def on_shutdown(self, stream, broker):
self.close() self.close()
def on_transmit(self, broker): def on_transmit(self, stream, broker):
written = self.transmit_side.write(self._output_buf) written = self.transmit_side.write(self._output_buf)
IOLOG.debug('%r.on_transmit() -> len %r', self, written) IOLOG.debug('%r.on_transmit() -> len %r', self, written)
if written is None: if written is None:
@ -153,8 +151,8 @@ class IoPump(mitogen.core.BasicStream):
if self._closed: if self._closed:
self.on_disconnect(broker) self.on_disconnect(broker)
def on_receive(self, broker): def on_receive(self, stream, broker):
s = self.receive_side.read() s = stream.receive_side.read()
IOLOG.debug('%r.on_receive() -> len %r', self, len(s)) IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if s: if s:
mitogen.core.fire(self, 'receive', s) mitogen.core.fire(self, 'receive', s)
@ -163,8 +161,8 @@ class IoPump(mitogen.core.BasicStream):
def __repr__(self): def __repr__(self):
return 'IoPump(%r, %r)' % ( return 'IoPump(%r, %r)' % (
self.receive_side.fd, self.receive_side.fp.fileno(),
self.transmit_side.fd, self.transmit_side.fp.fileno(),
) )
@ -173,14 +171,15 @@ class Process(object):
Manages the lifetime and pipe connections of the SSH command running in the Manages the lifetime and pipe connections of the SSH command running in the
slave. slave.
""" """
def __init__(self, router, stdin_fd, stdout_fd, proc=None): def __init__(self, router, stdin_fp, stdout_fp, proc=None):
self.router = router self.router = router
self.stdin_fd = stdin_fd self.stdin_fp = stdin_fp
self.stdout_fd = stdout_fd self.stdout_fp = stdout_fp
self.proc = proc self.proc = proc
self.control_handle = router.add_handler(self._on_control) self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin) self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump(router.broker, stdin_fd, stdout_fd) self.pump = IoPump.build_stream(router.broker)
self.pump.accept(stdin_fp, stdout_fp)
self.stdin = None self.stdin = None
self.control = None self.control = None
self.wake_event = threading.Event() self.wake_event = threading.Event()
@ -193,7 +192,7 @@ class Process(object):
pmon.add(proc.pid, self._on_proc_exit) pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self): def __repr__(self):
return 'Process(%r, %r)' % (self.stdin_fd, self.stdout_fd) return 'Process(%r, %r)' % (self.stdin_fp, self.stdout_fp)
def _on_proc_exit(self, status): def _on_proc_exit(self, status):
LOG.debug('%r._on_proc_exit(%r)', self, status) LOG.debug('%r._on_proc_exit(%r)', self, status)
@ -202,12 +201,12 @@ class Process(object):
def _on_stdin(self, msg): def _on_stdin(self, msg):
if msg.is_dead: if msg.is_dead:
IOLOG.debug('%r._on_stdin() -> %r', self, data) IOLOG.debug('%r._on_stdin() -> %r', self, data)
self.pump.close() self.pump.protocol.close()
return return
data = msg.unpickle() data = msg.unpickle()
IOLOG.debug('%r._on_stdin() -> len %d', self, len(data)) IOLOG.debug('%r._on_stdin() -> len %d', self, len(data))
self.pump.write(data) self.pump.protocol.write(data)
def _on_control(self, msg): def _on_control(self, msg):
if not msg.is_dead: if not msg.is_dead:
@ -279,13 +278,7 @@ def _start_slave(src_id, cmdline, router):
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
) )
process = Process( process = Process(router, proc.stdin, proc.stdout, proc)
router,
proc.stdin.fileno(),
proc.stdout.fileno(),
proc,
)
return process.control_handle, process.stdin_handle return process.control_handle, process.stdin_handle
@ -361,7 +354,9 @@ def _fakessh_main(dest_context_id, econtext):
LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r', LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r',
control_handle, stdin_handle) control_handle, stdin_handle)
process = Process(econtext.router, 1, 0) process = Process(econtext.router,
stdin_fp=os.fdopen(1, 'w+b', 0),
stdout_fp=os.fdopen(0, 'r+b', 0))
process.start_master( process.start_master(
stdin=mitogen.core.Sender(dest, stdin_handle), stdin=mitogen.core.Sender(dest, stdin_handle),
control=mitogen.core.Sender(dest, control_handle), control=mitogen.core.Sender(dest, control_handle),
@ -427,7 +422,7 @@ def run(dest, router, args, deadline=None, econtext=None):
stream = mitogen.core.Stream(router, context_id) stream = mitogen.core.Stream(router, context_id)
stream.name = u'fakessh' stream.name = u'fakessh'
stream.accept(sock1.fileno(), sock1.fileno()) stream.accept(sock1, sock1)
router.register(fakessh, stream) router.register(fakessh, stream)
# Held in socket buffer until process is booted. # Held in socket buffer until process is booted.

@ -28,6 +28,7 @@
# !mitogen: minify_safe # !mitogen: minify_safe
import errno
import logging import logging
import os import os
import random import random
@ -119,32 +120,45 @@ def handle_child_crash():
os._exit(1) os._exit(1)
class Stream(mitogen.parent.Stream): class Process(mitogen.parent.Process):
child_is_immediate_subprocess = True def poll(self):
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD:
LOG.warn('%r: waitpid(%r) produced ECHILD', self, self.pid)
return
raise
if not pid:
return
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
elif os.WIFSTOPPED(status):
return -os.WSTOPSIG(status)
class Options(mitogen.parent.Options):
#: Reference to the importer, if any, recovered from the parent. #: Reference to the importer, if any, recovered from the parent.
importer = None importer = None
#: User-supplied function for cleaning up child process state. #: User-supplied function for cleaning up child process state.
on_fork = None on_fork = None
python_version_msg = ( def __init__(self, old_router, max_message_size, on_fork=None, debug=False,
"The mitogen.fork method is not supported on Python versions " profiling=False, unidirectional=False, on_start=None,
"prior to 2.6, since those versions made no attempt to repair " name=None):
"critical interpreter state following a fork. Please use the "
"local() method instead."
)
def construct(self, old_router, max_message_size, on_fork=None,
debug=False, profiling=False, unidirectional=False,
on_start=None):
if not FORK_SUPPORTED: if not FORK_SUPPORTED:
raise Error(self.python_version_msg) raise Error(self.python_version_msg)
# fork method only supports a tiny subset of options. # fork method only supports a tiny subset of options.
super(Stream, self).construct(max_message_size=max_message_size, super(Options, self).__init__(
debug=debug, profiling=profiling, max_message_size=max_message_size, debug=debug,
unidirectional=False) profiling=profiling, unidirectional=unidirectional, name=name,
)
self.on_fork = on_fork self.on_fork = on_fork
self.on_start = on_start self.on_start = on_start
@ -152,17 +166,26 @@ class Stream(mitogen.parent.Stream):
if isinstance(responder, mitogen.parent.ModuleForwarder): if isinstance(responder, mitogen.parent.ModuleForwarder):
self.importer = responder.importer self.importer = responder.importer
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = True
python_version_msg = (
"The mitogen.fork method is not supported on Python versions "
"prior to 2.6, since those versions made no attempt to repair "
"critical interpreter state following a fork. Please use the "
"local() method instead."
)
name_prefix = u'fork' name_prefix = u'fork'
def start_child(self): def start_child(self):
parentfp, childfp = mitogen.parent.create_socketpair() parentfp, childfp = mitogen.parent.create_socketpair()
self.pid = os.fork() pid = os.fork()
if self.pid: if pid:
childfp.close() childfp.close()
# Decouple the socket from the lifetime of the Python socket object. return Process(pid, parentfp)
fd = os.dup(parentfp.fileno())
parentfp.close()
return self.pid, fd, None
else: else:
parentfp.close() parentfp.close()
self._wrap_child_main(childfp) self._wrap_child_main(childfp)
@ -173,12 +196,24 @@ class Stream(mitogen.parent.Stream):
except BaseException: except BaseException:
handle_child_crash() handle_child_crash()
def get_econtext_config(self):
config = super(Connection, self).get_econtext_config()
config['core_src_fd'] = None
config['importer'] = self.options.importer
config['send_ec2'] = False
config['setup_package'] = False
if self.options.on_start:
config['on_start'] = self.options.on_start
return config
def _child_main(self, childfp): def _child_main(self, childfp):
on_fork() on_fork()
if self.on_fork: if self.options.on_fork:
self.on_fork() self.options.on_fork()
mitogen.core.set_block(childfp.fileno()) mitogen.core.set_block(childfp.fileno())
childfp.send('MITO002\n')
# Expected by the ExternalContext.main(). # Expected by the ExternalContext.main().
os.dup2(childfp.fileno(), 1) os.dup2(childfp.fileno(), 1)
os.dup2(childfp.fileno(), 100) os.dup2(childfp.fileno(), 100)
@ -201,23 +236,12 @@ class Stream(mitogen.parent.Stream):
if childfp.fileno() not in (0, 1, 100): if childfp.fileno() not in (0, 1, 100):
childfp.close() childfp.close()
config = self.get_econtext_config()
config['core_src_fd'] = None
config['importer'] = self.importer
config['setup_package'] = False
if self.on_start:
config['on_start'] = self.on_start
try: try:
try: try:
mitogen.core.ExternalContext(config).main() mitogen.core.ExternalContext(self.get_econtext_config()).main()
except Exception: except Exception:
# TODO: report exception somehow. # TODO: report exception somehow.
os._exit(72) os._exit(72)
finally: finally:
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)
def _connect_bootstrap(self):
# None required.
pass

@ -37,29 +37,34 @@ import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
child_is_immediate_subprocess = False
create_child_args = {
'merge_stdio': True
}
container = None container = None
username = None username = None
jexec_path = '/usr/sbin/jexec' jexec_path = u'/usr/sbin/jexec'
def construct(self, container, jexec_path=None, username=None, **kwargs): def __init__(self, container, jexec_path=None, username=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
self.container = container self.container = mitogen.core.to_text(container)
self.username = username if username:
self.username = mitogen.core.to_text(username)
if jexec_path: if jexec_path:
self.jexec_path = jexec_path self.jexec_path = jexec_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
create_child_args = {
'merge_stdio': True
}
def _get_name(self): def _get_name(self):
return u'jail.' + self.container return u'jail.' + self.options.container
def get_boot_command(self): def get_boot_command(self):
bits = [self.jexec_path] bits = [self.options.jexec_path]
if self.username: if self.options.username:
bits += ['-U', self.username] bits += ['-U', self.options.username]
bits += [self.container] bits += [self.options.container]
return bits + super(Stream, self).get_boot_command() return bits + super(Connection, self).get_boot_command()

@ -37,29 +37,36 @@ import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
child_is_immediate_subprocess = True
pod = None pod = None
kubectl_path = 'kubectl' kubectl_path = 'kubectl'
kubectl_args = None kubectl_args = None
# TODO: better way of capturing errors such as "No such container." def __init__(self, pod, kubectl_path=None, kubectl_args=None, **kwargs):
create_child_args = { super(Options, self).__init__(**kwargs)
'merge_stdio': True
}
def construct(self, pod, kubectl_path=None, kubectl_args=None, **kwargs):
super(Stream, self).construct(**kwargs)
assert pod assert pod
self.pod = pod self.pod = pod
if kubectl_path: if kubectl_path:
self.kubectl_path = kubectl_path self.kubectl_path = kubectl_path
self.kubectl_args = kubectl_args or [] self.kubectl_args = kubectl_args or []
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = True
# TODO: better way of capturing errors such as "No such container."
create_child_args = {
'merge_stdio': True
}
def _get_name(self): def _get_name(self):
return u'kubectl.%s%s' % (self.pod, self.kubectl_args) return u'kubectl.%s%s' % (self.options.pod, self.options.kubectl_args)
def get_boot_command(self): def get_boot_command(self):
bits = [self.kubectl_path] + self.kubectl_args + ['exec', '-it', self.pod] bits = [
return bits + ["--"] + super(Stream, self).get_boot_command() self.options.kubectl_path
] + self.options.kubectl_args + [
'exec', '-it', self.options.pod
]
return bits + ["--"] + super(Connection, self).get_boot_command()

@ -37,7 +37,20 @@ import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
container = None
lxc_attach_path = 'lxc-attach'
def __init__(self, container, lxc_attach_path=None, **kwargs):
super(Options, self).__init__(**kwargs)
self.container = container
if lxc_attach_path:
self.lxc_attach_path = lxc_attach_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False child_is_immediate_subprocess = False
create_child_args = { create_child_args = {
# If lxc-attach finds any of stdin, stdout, stderr connected to a TTY, # If lxc-attach finds any of stdin, stdout, stderr connected to a TTY,
@ -47,29 +60,20 @@ class Stream(mitogen.parent.Stream):
'merge_stdio': True 'merge_stdio': True
} }
container = None
lxc_attach_path = 'lxc-attach'
eof_error_hint = ( eof_error_hint = (
'Note: many versions of LXC do not report program execution failure ' 'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more ' 'meaningfully. Please check the host logs (/var/log) for more '
'information.' 'information.'
) )
def construct(self, container, lxc_attach_path=None, **kwargs):
super(Stream, self).construct(**kwargs)
self.container = container
if lxc_attach_path:
self.lxc_attach_path = lxc_attach_path
def _get_name(self): def _get_name(self):
return u'lxc.' + self.container return u'lxc.' + self.options.container
def get_boot_command(self): def get_boot_command(self):
bits = [ bits = [
self.lxc_attach_path, self.options.lxc_attach_path,
'--clear-env', '--clear-env',
'--name', self.container, '--name', self.options.container,
'--', '--',
] ]
return bits + super(Stream, self).get_boot_command() return bits + super(Connection, self).get_boot_command()

@ -37,7 +37,21 @@ import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
container = None
lxc_path = 'lxc'
python_path = 'python'
def __init__(self, container, lxc_path=None, **kwargs):
super(Options, self).__init__(**kwargs)
self.container = container
if lxc_path:
self.lxc_path = lxc_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False child_is_immediate_subprocess = False
create_child_args = { create_child_args = {
# If lxc finds any of stdin, stdout, stderr connected to a TTY, to # If lxc finds any of stdin, stdout, stderr connected to a TTY, to
@ -47,31 +61,21 @@ class Stream(mitogen.parent.Stream):
'merge_stdio': True 'merge_stdio': True
} }
container = None
lxc_path = 'lxc'
python_path = 'python'
eof_error_hint = ( eof_error_hint = (
'Note: many versions of LXC do not report program execution failure ' 'Note: many versions of LXC do not report program execution failure '
'meaningfully. Please check the host logs (/var/log) for more ' 'meaningfully. Please check the host logs (/var/log) for more '
'information.' 'information.'
) )
def construct(self, container, lxc_path=None, **kwargs):
super(Stream, self).construct(**kwargs)
self.container = container
if lxc_path:
self.lxc_path = lxc_path
def _get_name(self): def _get_name(self):
return u'lxd.' + self.container return u'lxd.' + self.options.container
def get_boot_command(self): def get_boot_command(self):
bits = [ bits = [
self.lxc_path, self.options.lxc_path,
'exec', 'exec',
'--mode=noninteractive', '--mode=noninteractive',
self.container, self.options.container,
'--', '--',
] ]
return bits + super(Stream, self).get_boot_command() return bits + super(Connection, self).get_boot_command()

@ -531,14 +531,15 @@ class SysModulesMethod(FinderMethod):
return return
if not isinstance(module, types.ModuleType): if not isinstance(module, types.ModuleType):
LOG.debug('sys.modules[%r] absent or not a regular module', LOG.debug('%r: sys.modules[%r] absent or not a regular module',
fullname) self, fullname)
return return
path = _py_filename(getattr(module, '__file__', '')) path = _py_filename(getattr(module, '__file__', ''))
if not path: if not path:
return return
LOG.debug('%r: sys.modules[%r]: found %s', self, fullname, path)
is_pkg = hasattr(module, '__path__') is_pkg = hasattr(module, '__path__')
try: try:
source = inspect.getsource(module) source = inspect.getsource(module)
@ -920,17 +921,17 @@ class ModuleResponder(object):
return tup return tup
def _send_load_module(self, stream, fullname): def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules: if fullname not in stream.protocol.sent_modules:
tup = self._build_tuple(fullname) tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled( msg = mitogen.core.Message.pickled(
tup, tup,
dst_id=stream.remote_id, dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE, handle=mitogen.core.LOAD_MODULE,
) )
LOG.debug('%s: sending %s (%.2f KiB) to %s', LOG.debug('%s: sending %s (%.2f KiB) to %s',
self, fullname, len(msg.data) / 1024.0, stream.name) self, fullname, len(msg.data) / 1024.0, stream.name)
self._router._async_route(msg) self._router._async_route(msg)
stream.sent_modules.add(fullname) stream.protocol.sent_modules.add(fullname)
if tup[2] is not None: if tup[2] is not None:
self.good_load_module_count += 1 self.good_load_module_count += 1
self.good_load_module_size += len(msg.data) self.good_load_module_size += len(msg.data)
@ -939,23 +940,23 @@ class ModuleResponder(object):
def _send_module_load_failed(self, stream, fullname): def _send_module_load_failed(self, stream, fullname):
self.bad_load_module_count += 1 self.bad_load_module_count += 1
stream.send( stream.protocol.send(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
self._make_negative_response(fullname), self._make_negative_response(fullname),
dst_id=stream.remote_id, dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE, handle=mitogen.core.LOAD_MODULE,
) )
) )
def _send_module_and_related(self, stream, fullname): def _send_module_and_related(self, stream, fullname):
if fullname in stream.sent_modules: if fullname in stream.protocol.sent_modules:
return return
try: try:
tup = self._build_tuple(fullname) tup = self._build_tuple(fullname)
for name in tup[4]: # related for name in tup[4]: # related
parent, _, _ = str_partition(name, '.') parent, _, _ = str_partition(name, '.')
if parent != fullname and parent not in stream.sent_modules: if parent != fullname and parent not in stream.protocol.sent_modules:
# Parent hasn't been sent, so don't load submodule yet. # Parent hasn't been sent, so don't load submodule yet.
continue continue
@ -976,7 +977,7 @@ class ModuleResponder(object):
fullname = msg.data.decode() fullname = msg.data.decode()
LOG.debug('%s requested module %s', stream.name, fullname) LOG.debug('%s requested module %s', stream.name, fullname)
self.get_module_count += 1 self.get_module_count += 1
if fullname in stream.sent_modules: if fullname in stream.protocol.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r', LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream) fullname, stream)
@ -987,12 +988,12 @@ class ModuleResponder(object):
self.get_module_secs += time.time() - t0 self.get_module_secs += time.time() - t0
def _send_forward_module(self, stream, context, fullname): def _send_forward_module(self, stream, context, fullname):
if stream.remote_id != context.context_id: if stream.protocol.remote_id != context.context_id:
stream.send( stream.send(
mitogen.core.Message( mitogen.core.Message(
data=b('%s\x00%s' % (context.context_id, fullname)), data=b('%s\x00%s' % (context.context_id, fullname)),
handle=mitogen.core.FORWARD_MODULE, handle=mitogen.core.FORWARD_MODULE,
dst_id=stream.remote_id, dst_id=stream.protocol.remote_id,
) )
) )

File diff suppressed because it is too large Load Diff

@ -485,7 +485,6 @@ class Pool(object):
) )
thread.start() thread.start()
self._threads.append(thread) self._threads.append(thread)
LOG.debug('%r: initialized', self) LOG.debug('%r: initialized', self)
def _py_24_25_compat(self): def _py_24_25_compat(self):
@ -658,7 +657,7 @@ class PushFileService(Service):
def _forward(self, context, path): def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id) stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id) child = mitogen.core.Context(self.router, stream.protocol.remote_id)
sent = self._sent_by_stream.setdefault(stream, set()) sent = self._sent_by_stream.setdefault(stream, set())
if path in sent: if path in sent:
if child.context_id != context.context_id: if child.context_id != context.context_id:
@ -891,7 +890,7 @@ class FileService(Service):
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer. # odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header. # Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Message.HEADER_LEN + (
len( len(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE) mitogen.core.Blob(b(' ') * mitogen.core.CHUNK_SIZE)

@ -116,9 +116,15 @@ def get_machinectl_pid(path, name):
raise Error("could not find PID from machinectl output.\n%s", output) raise Error("could not find PID from machinectl output.\n%s", output)
class Stream(mitogen.parent.Stream): GET_LEADER_BY_KIND = {
child_is_immediate_subprocess = False 'docker': ('docker_path', get_docker_pid),
'lxc': ('lxc_info_path', get_lxc_pid),
'lxd': ('lxc_path', get_lxd_pid),
'machinectl': ('machinectl_path', get_machinectl_pid),
}
class Options(mitogen.parent.Options):
container = None container = None
username = 'root' username = 'root'
kind = None kind = None
@ -128,24 +134,17 @@ class Stream(mitogen.parent.Stream):
lxc_info_path = 'lxc-info' lxc_info_path = 'lxc-info'
machinectl_path = 'machinectl' machinectl_path = 'machinectl'
GET_LEADER_BY_KIND = { def __init__(self, container, kind, username=None, docker_path=None,
'docker': ('docker_path', get_docker_pid),
'lxc': ('lxc_info_path', get_lxc_pid),
'lxd': ('lxc_path', get_lxd_pid),
'machinectl': ('machinectl_path', get_machinectl_pid),
}
def construct(self, container, kind, username=None, docker_path=None,
lxc_path=None, lxc_info_path=None, machinectl_path=None, lxc_path=None, lxc_info_path=None, machinectl_path=None,
**kwargs): **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
if kind not in self.GET_LEADER_BY_KIND: if kind not in GET_LEADER_BY_KIND:
raise Error('unsupported container kind: %r', kind) raise Error('unsupported container kind: %r', kind)
self.container = container self.container = mitogen.core.to_text(container)
self.kind = kind self.kind = kind
if username: if username:
self.username = username self.username = mitogen.core.to_text(username)
if docker_path: if docker_path:
self.docker_path = docker_path self.docker_path = docker_path
if lxc_path: if lxc_path:
@ -155,6 +154,11 @@ class Stream(mitogen.parent.Stream):
if machinectl_path: if machinectl_path:
self.machinectl_path = machinectl_path self.machinectl_path = machinectl_path
class Connection(mitogen.parent.Connection):
options_class = Options
child_is_immediate_subprocess = False
# Order matters. https://github.com/karelzak/util-linux/commit/854d0fe/ # Order matters. https://github.com/karelzak/util-linux/commit/854d0fe/
NS_ORDER = ('ipc', 'uts', 'net', 'pid', 'mnt', 'user') NS_ORDER = ('ipc', 'uts', 'net', 'pid', 'mnt', 'user')
@ -189,15 +193,15 @@ class Stream(mitogen.parent.Stream):
try: try:
os.setgroups([grent.gr_gid os.setgroups([grent.gr_gid
for grent in grp.getgrall() for grent in grp.getgrall()
if self.username in grent.gr_mem]) if self.options.username in grent.gr_mem])
pwent = pwd.getpwnam(self.username) pwent = pwd.getpwnam(self.options.username)
os.setreuid(pwent.pw_uid, pwent.pw_uid) os.setreuid(pwent.pw_uid, pwent.pw_uid)
# shadow-4.4/libmisc/setupenv.c. Not done: MAIL, PATH # shadow-4.4/libmisc/setupenv.c. Not done: MAIL, PATH
os.environ.update({ os.environ.update({
'HOME': pwent.pw_dir, 'HOME': pwent.pw_dir,
'SHELL': pwent.pw_shell or '/bin/sh', 'SHELL': pwent.pw_shell or '/bin/sh',
'LOGNAME': self.username, 'LOGNAME': self.options.username,
'USER': self.username, 'USER': self.options.username,
}) })
if ((os.path.exists(pwent.pw_dir) and if ((os.path.exists(pwent.pw_dir) and
os.access(pwent.pw_dir, os.X_OK))): os.access(pwent.pw_dir, os.X_OK))):
@ -217,7 +221,7 @@ class Stream(mitogen.parent.Stream):
# namespaces, meaning starting new threads in the exec'd program will # namespaces, meaning starting new threads in the exec'd program will
# fail. The solution is forking, so inject a /bin/sh call to achieve # fail. The solution is forking, so inject a /bin/sh call to achieve
# this. # this.
argv = super(Stream, self).get_boot_command() argv = super(Connection, self).get_boot_command()
# bash will exec() if a single command was specified and the shell has # bash will exec() if a single command was specified and the shell has
# nothing left to do, so "; exit $?" gives bash a reason to live. # nothing left to do, so "; exit $?" gives bash a reason to live.
return ['/bin/sh', '-c', '%s; exit $?' % (mitogen.parent.Argv(argv),)] return ['/bin/sh', '-c', '%s; exit $?' % (mitogen.parent.Argv(argv),)]
@ -226,13 +230,12 @@ class Stream(mitogen.parent.Stream):
return mitogen.parent.create_child(args, preexec_fn=self.preexec_fn) return mitogen.parent.create_child(args, preexec_fn=self.preexec_fn)
def _get_name(self): def _get_name(self):
return u'setns.' + self.container return u'setns.' + self.options.container
def connect(self): def connect(self, **kwargs):
self.name = self._get_name() attr, func = GET_LEADER_BY_KIND[self.options.kind]
attr, func = self.GET_LEADER_BY_KIND[self.kind] tool_path = getattr(self.options, attr)
tool_path = getattr(self, attr) self.leader_pid = func(tool_path, self.options.container)
self.leader_pid = func(tool_path, self.container)
LOG.debug('Leader PID for %s container %r: %d', LOG.debug('Leader PID for %s container %r: %d',
self.kind, self.container, self.leader_pid) self.options.kind, self.options.container, self.leader_pid)
super(Stream, self).connect() return super(Connection, self).connect(**kwargs)

@ -29,7 +29,7 @@
# !mitogen: minify_safe # !mitogen: minify_safe
""" """
Functionality to allow establishing new slave contexts over an SSH connection. Construct new children via the OpenSSH client.
""" """
import logging import logging
@ -52,82 +52,122 @@ except NameError:
LOG = logging.getLogger('mitogen') LOG = logging.getLogger('mitogen')
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified'
hostkey_config_msg = (
'SSH requested permission to accept unknown host key, but '
'check_host_keys=ignore. This is likely due to ssh_args= '
'conflicting with check_host_keys=. Please correct your '
'configuration.'
)
hostkey_failed_msg = (
'Host key checking is enabled, and SSH reported an unrecognized or '
'mismatching host key.'
)
# sshpass uses 'assword' because it doesn't lowercase the input. # sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = b('password') PASSWORD_PROMPT_PATTERN = re.compile(
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?') b('password'),
HOSTKEY_FAIL = b('host key verification failed.') re.I
)
HOSTKEY_REQ_PATTERN = re.compile(
b(r'are you sure you want to continue connecting \(yes/no\)\?'),
re.I
)
HOSTKEY_FAIL_PATTERN = re.compile(
b(r'host key verification failed\.'),
re.I
)
# [user@host: ] permission denied # [user@host: ] permission denied
PERMDENIED_RE = re.compile( PERMDENIED_PATTERN = re.compile(
('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5 b('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied').encode(), 'Permission denied'),
re.I re.I
) )
DEBUG_PATTERN = re.compile(b'^debug[123]:')
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
class PasswordError(mitogen.core.StreamError):
pass
def filter_debug(stream, it):
"""
Read line chunks from it, either yielding them directly, or building up and
logging individual lines if they look like SSH debug output.
This contains the mess of dealing with both line-oriented input, and partial class HostKeyError(mitogen.core.StreamError):
lines such as the password prompt. pass
Yields `(line, partial)` tuples, where `line` is the line, `partial` is class SetupProtocol(mitogen.parent.RegexProtocol):
:data:`True` if no terminating newline character was present and no more """
data exists in the read buffer. Consuming code can use this to unreliably This protocol is attached to stderr of the SSH client. It responds to
detect the presence of an interactive prompt. various interactive prompts as required.
""" """
# The `partial` test is unreliable, but is only problematic when verbosity password_sent = False
# is enabled: it's possible for a combination of SSH banner, password
# prompt, verbose output, timing and OS buffering specifics to create a def _on_host_key_request(self, line, match):
# situation where an otherwise newline-terminated line appears to not be if self.stream.conn.options.check_host_keys == 'accept':
# terminated, due to a partial read(). If something is broken when LOG.debug('%s: accepting host key', self.stream.name)
# ssh_debug_level>0, this is the first place to look. self.stream.transmit_side.write(b('yes\n'))
state = 'start_of_line' return
buf = b('')
for chunk in it: # _host_key_prompt() should never be reached with ignore or enforce
buf += chunk # mode, SSH should have handled that. User's ssh_args= is conflicting
while buf: # with ours.
if state == 'start_of_line': self.stream.conn._fail_connection(HostKeyError(hostkey_config_msg))
if len(buf) < 8:
# short read near buffer limit, block awaiting at least 8 def _on_host_key_failed(self, line, match):
# bytes so we can discern a debug line, or the minimum self.stream.conn._fail_connection(HostKeyError(hostkey_failed_msg))
# interesting token from above or the bootstrap
# ('password', 'MITO000\n'). def _on_permission_denied(self, line, match):
break # issue #271: work around conflict with user shell reporting
elif any(buf.startswith(p) for p in DEBUG_PREFIXES): # 'permission denied' e.g. during chdir($HOME) by only matching it at
state = 'in_debug' # the start of the line.
if self.stream.conn.options.password is not None and \
self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
elif PASSWORD_PROMPT_PATTERN.search(line) and \
self.stream.conn.options.password is None:
# Permission denied (password,pubkey)
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
else: else:
state = 'in_plain' self.stream.conn._fail_connection(
elif state == 'in_debug': PasswordError(auth_incorrect_msg)
if b('\n') not in buf: )
break
line, _, buf = bytes_partition(buf, b('\n'))
LOG.debug('%s: %s', stream.name,
mitogen.core.to_text(line.rstrip()))
state = 'start_of_line'
elif state == 'in_plain':
line, nl, buf = bytes_partition(buf, b('\n'))
yield line + nl, not (nl or buf)
if nl:
state = 'start_of_line'
def _on_password_prompt(self, line, match):
LOG.debug('%s: (password prompt): %s', self.stream.name, line)
if self.stream.conn.options.password is None:
self.stream.conn._fail(PasswordError(password_required_msg))
class PasswordError(mitogen.core.StreamError): self.stream.transmit_side.write(
pass (self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
def _on_debug_line(self, line, match):
text = mitogen.core.to_text(line.rstrip())
LOG.debug('%s: %s', self.stream.name, text)
class HostKeyError(mitogen.core.StreamError): PATTERNS = [
pass (DEBUG_PATTERN, _on_debug_line),
(HOSTKEY_FAIL_PATTERN, _on_host_key_failed),
(PERMDENIED_PATTERN, _on_permission_denied),
]
PARTIAL_PATTERNS = [
(PASSWORD_PROMPT_PATTERN, _on_password_prompt),
(HOSTKEY_REQ_PATTERN, _on_host_key_request),
]
class Stream(mitogen.parent.Stream):
child_is_immediate_subprocess = False
class Options(mitogen.parent.Options):
#: Default to whatever is available as 'python' on the remote machine, #: Default to whatever is available as 'python' on the remote machine,
#: overriding sys.executable use. #: overriding sys.executable use.
python_path = 'python' python_path = 'python'
@ -141,19 +181,19 @@ class Stream(mitogen.parent.Stream):
hostname = None hostname = None
username = None username = None
port = None port = None
identity_file = None identity_file = None
password = None password = None
ssh_args = None ssh_args = None
check_host_keys_msg = 'check_host_keys= must be set to accept, enforce or ignore' check_host_keys_msg = 'check_host_keys= must be set to accept, enforce or ignore'
def construct(self, hostname, username=None, ssh_path=None, port=None, def __init__(self, hostname, username=None, ssh_path=None, port=None,
check_host_keys='enforce', password=None, identity_file=None, check_host_keys='enforce', password=None, identity_file=None,
compression=True, ssh_args=None, keepalive_enabled=True, compression=True, ssh_args=None, keepalive_enabled=True,
keepalive_count=3, keepalive_interval=15, keepalive_count=3, keepalive_interval=15,
identities_only=True, ssh_debug_level=None, **kwargs): identities_only=True, ssh_debug_level=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
if check_host_keys not in ('accept', 'enforce', 'ignore'): if check_host_keys not in ('accept', 'enforce', 'ignore'):
raise ValueError(self.check_host_keys_msg) raise ValueError(self.check_host_keys_msg)
@ -175,143 +215,81 @@ class Stream(mitogen.parent.Stream):
if ssh_debug_level: if ssh_debug_level:
self.ssh_debug_level = ssh_debug_level self.ssh_debug_level = ssh_debug_level
self._init_create_child()
class Connection(mitogen.parent.Connection):
options_class = Options
diag_protocol_class = SetupProtocol
child_is_immediate_subprocess = False
def _get_name(self):
s = u'ssh.' + mitogen.core.to_text(self.options.hostname)
if self.options.port and self.options.port != 22:
s += u':%s' % (self.options.port,)
return s
def _requires_pty(self): def _requires_pty(self):
""" """
Return :data:`True` if the configuration requires a PTY to be Return :data:`True` if a PTY to is required for this configuration,
allocated. This is only true if we must interactively accept host keys, because it must interactively accept host keys or type a password.
or type a password.
""" """
return (self.check_host_keys == 'accept' or return (
self.password is not None) self.options.check_host_keys == 'accept' or
self.options.password is not None
)
def _init_create_child(self): def create_child(self, **kwargs):
""" """
Initialize the base class :attr:`create_child` and Avoid PTY use when possible to avoid a scaling limitation.
:attr:`create_child_args` according to whether we need a PTY or not.
""" """
if self._requires_pty(): if self._requires_pty():
self.create_child = mitogen.parent.hybrid_tty_create_child return mitogen.parent.hybrid_tty_create_child(**kwargs)
else: else:
self.create_child = mitogen.parent.create_child return mitogen.parent.create_child(stderr_pipe=True, **kwargs)
self.create_child_args = {
'stderr_pipe': True,
}
def get_boot_command(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.options.ssh_path]
if self.ssh_debug_level: if self.options.ssh_debug_level:
bits += ['-' + ('v' * min(3, self.ssh_debug_level))] bits += ['-' + ('v' * min(3, self.options.ssh_debug_level))]
else: else:
# issue #307: suppress any login banner, as it may contain the # issue #307: suppress any login banner, as it may contain the
# password prompt, and there is no robust way to tell the # password prompt, and there is no robust way to tell the
# difference. # difference.
bits += ['-o', 'LogLevel ERROR'] bits += ['-o', 'LogLevel ERROR']
if self.username: if self.options.username:
bits += ['-l', self.username] bits += ['-l', self.options.username]
if self.port is not None: if self.options.port is not None:
bits += ['-p', str(self.port)] bits += ['-p', str(self.options.port)]
if self.identities_only and (self.identity_file or self.password): if self.options.identities_only and (self.options.identity_file or
self.options.password):
bits += ['-o', 'IdentitiesOnly yes'] bits += ['-o', 'IdentitiesOnly yes']
if self.identity_file: if self.options.identity_file:
bits += ['-i', self.identity_file] bits += ['-i', self.options.identity_file]
if self.compression: if self.options.compression:
bits += ['-o', 'Compression yes'] bits += ['-o', 'Compression yes']
if self.keepalive_enabled: if self.options.keepalive_enabled:
bits += [ bits += [
'-o', 'ServerAliveInterval %s' % (self.keepalive_interval,), '-o', 'ServerAliveInterval %s' % (
'-o', 'ServerAliveCountMax %s' % (self.keepalive_count,), self.options.keepalive_interval,
),
'-o', 'ServerAliveCountMax %s' % (
self.options.keepalive_count,
),
] ]
if not self._requires_pty(): if not self._requires_pty():
bits += ['-o', 'BatchMode yes'] bits += ['-o', 'BatchMode yes']
if self.check_host_keys == 'enforce': if self.options.check_host_keys == 'enforce':
bits += ['-o', 'StrictHostKeyChecking yes'] bits += ['-o', 'StrictHostKeyChecking yes']
if self.check_host_keys == 'accept': if self.options.check_host_keys == 'accept':
bits += ['-o', 'StrictHostKeyChecking ask'] bits += ['-o', 'StrictHostKeyChecking ask']
elif self.check_host_keys == 'ignore': elif self.options.check_host_keys == 'ignore':
bits += [ bits += [
'-o', 'StrictHostKeyChecking no', '-o', 'StrictHostKeyChecking no',
'-o', 'UserKnownHostsFile /dev/null', '-o', 'UserKnownHostsFile /dev/null',
'-o', 'GlobalKnownHostsFile /dev/null', '-o', 'GlobalKnownHostsFile /dev/null',
] ]
if self.ssh_args: if self.options.ssh_args:
bits += self.ssh_args bits += self.options.ssh_args
bits.append(self.hostname) bits.append(self.options.hostname)
base = super(Stream, self).get_boot_command() base = super(Connection, self).get_boot_command()
return bits + [shlex_quote(s).strip() for s in base] return bits + [shlex_quote(s).strip() for s in base]
def _get_name(self):
s = u'ssh.' + mitogen.core.to_text(self.hostname)
if self.port:
s += u':%s' % (self.port,)
return s
auth_incorrect_msg = 'SSH authentication is incorrect'
password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified'
hostkey_config_msg = (
'SSH requested permission to accept unknown host key, but '
'check_host_keys=ignore. This is likely due to ssh_args= '
'conflicting with check_host_keys=. Please correct your '
'configuration.'
)
hostkey_failed_msg = (
'Host key checking is enabled, and SSH reported an unrecognized or '
'mismatching host key.'
)
def _host_key_prompt(self):
if self.check_host_keys == 'accept':
LOG.debug('%s: accepting host key', self.name)
self.diag_stream.transmit_side.write(b('yes\n'))
return
# _host_key_prompt() should never be reached with ignore or enforce
# mode, SSH should have handled that. User's ssh_args= is conflicting
# with ours.
raise HostKeyError(self.hostkey_config_msg)
def _connect_input_loop(self, it):
password_sent = False
for buf, partial in filter_debug(self, it):
LOG.debug('%s: stdout: %s', self.name, buf.rstrip())
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
elif HOSTKEY_REQ_PROMPT in buf.lower():
self._host_key_prompt()
elif HOSTKEY_FAIL in buf.lower():
raise HostKeyError(self.hostkey_failed_msg)
elif PERMDENIED_RE.match(buf):
# issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching
# it at the start of the line.
if self.password is not None and password_sent:
raise PasswordError(self.password_incorrect_msg)
elif PASSWORD_PROMPT in buf and self.password is None:
# Permission denied (password,pubkey)
raise PasswordError(self.password_required_msg)
else:
raise PasswordError(self.auth_incorrect_msg)
elif partial and PASSWORD_PROMPT in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
LOG.debug('%s: sending password', self.name)
self.diag_stream.transmit_side.write(
(self.password + '\n').encode()
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)
try:
self._connect_input_loop(it)
finally:
it.close()

@ -29,6 +29,7 @@
# !mitogen: minify_safe # !mitogen: minify_safe
import logging import logging
import re
import mitogen.core import mitogen.core
import mitogen.parent import mitogen.parent
@ -42,87 +43,120 @@ except NameError:
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required'
class PasswordError(mitogen.core.StreamError): class PasswordError(mitogen.core.StreamError):
pass pass
class Stream(mitogen.parent.Stream): class SetupBootstrapProtocol(mitogen.parent.BootstrapProtocol):
# TODO: BSD su cannot handle stdin being a socketpair, but it does let the password_sent = False
# child inherit fds from the parent. So we can still pass a socketpair in
# for hybrid_tty_create_child(), there just needs to be either a shell def setup_patterns(self, conn):
# snippet or bootstrap support for fixing things up afterwards. """
create_child = staticmethod(mitogen.parent.tty_create_child) su options cause the regexes used to vary. This is a mess, requires
child_is_immediate_subprocess = False reworking.
"""
incorrect_pattern = re.compile(
mitogen.core.b('|').join(
re.escape(s.encode('utf-8'))
for s in conn.options.incorrect_prompts
),
re.I
)
prompt_pattern = re.compile(
re.escape(
conn.options.password_prompt.encode('utf-8')
),
re.I
)
self.PATTERNS = mitogen.parent.BootstrapProtocol.PATTERNS + [
(incorrect_pattern, type(self)._on_password_incorrect),
]
self.PARTIAL_PATTERNS = mitogen.parent.BootstrapProtocol.PARTIAL_PATTERNS + [
(prompt_pattern, type(self)._on_password_prompt),
]
def _on_password_prompt(self, line, match):
LOG.debug('%r: (password prompt): %r',
self.stream.name, line.decode('utf-8', 'replace'))
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
return
#: Once connected, points to the corresponding DiagLogStream, allowing it to self.stream.transmit_side.write(
#: be disconnected at the same time this stream is being torn down. (self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
def _on_password_incorrect(self, line, match):
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
username = 'root'
class Options(mitogen.parent.Options):
username = u'root'
password = None password = None
su_path = 'su' su_path = 'su'
password_prompt = b('password:') password_prompt = u'password:'
incorrect_prompts = ( incorrect_prompts = (
b('su: sorry'), # BSD u'su: sorry', # BSD
b('su: authentication failure'), # Linux u'su: authentication failure', # Linux
b('su: incorrect password'), # CentOS 6 u'su: incorrect password', # CentOS 6
b('authentication is denied'), # AIX u'authentication is denied', # AIX
) )
def construct(self, username=None, password=None, su_path=None, def __init__(self, username=None, password=None, su_path=None,
password_prompt=None, incorrect_prompts=None, **kwargs): password_prompt=None, incorrect_prompts=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
if username is not None: if username is not None:
self.username = username self.username = mitogen.core.to_text(username)
if password is not None: if password is not None:
self.password = password self.password = mitogen.core.to_text(password)
if su_path is not None: if su_path is not None:
self.su_path = su_path self.su_path = su_path
if password_prompt is not None: if password_prompt is not None:
self.password_prompt = password_prompt.lower() self.password_prompt = password_prompt
if incorrect_prompts is not None: if incorrect_prompts is not None:
self.incorrect_prompts = map(str.lower, incorrect_prompts) self.incorrect_prompts = [
mitogen.core.to_text(p)
def _get_name(self): for p in incorrect_prompts
return u'su.' + mitogen.core.to_text(self.username) ]
def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)]
password_incorrect_msg = 'su password is incorrect' class Connection(mitogen.parent.Connection):
password_required_msg = 'su password is required' options_class = Options
stream_protocol_class = SetupBootstrapProtocol
def _connect_input_loop(self, it): # TODO: BSD su cannot handle stdin being a socketpair, but it does let the
password_sent = False # child inherit fds from the parent. So we can still pass a socketpair in
# for hybrid_tty_create_child(), there just needs to be either a shell
# snippet or bootstrap support for fixing things up afterwards.
create_child = staticmethod(mitogen.parent.tty_create_child)
child_is_immediate_subprocess = False
for buf in it: def _get_name(self):
LOG.debug('%r: received %r', self, buf) return u'su.' + self.options.username
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
if any(s in buf.lower() for s in self.incorrect_prompts):
if password_sent:
raise PasswordError(self.password_incorrect_msg)
elif self.password_prompt in buf.lower():
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password')
self.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed') def stream_factory(self):
stream = super(Connection, self).stream_factory()
stream.protocol.setup_patterns(self)
return stream
def _connect_bootstrap(self): def get_boot_command(self):
it = mitogen.parent.iter_read( argv = mitogen.parent.Argv(super(Connection, self).get_boot_command())
fds=[self.receive_side.fd], return [self.options.su_path, self.options.username, '-c', str(argv)]
deadline=self.connect_deadline,
)
try:
self._connect_input_loop(it)
finally:
it.close()

@ -40,6 +40,9 @@ from mitogen.core import b
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required'
# These are base64-encoded UTF-8 as our existing minifier/module server # These are base64-encoded UTF-8 as our existing minifier/module server
# struggles with Unicode Python source in some (forgotten) circumstances. # struggles with Unicode Python source in some (forgotten) circumstances.
PASSWORD_PROMPTS = [ PASSWORD_PROMPTS = [
@ -99,14 +102,13 @@ PASSWORD_PROMPTS = [
PASSWORD_PROMPT_RE = re.compile( PASSWORD_PROMPT_RE = re.compile(
u'|'.join( mitogen.core.b('|').join(
base64.b64decode(s).decode('utf-8') base64.b64decode(s)
for s in PASSWORD_PROMPTS for s in PASSWORD_PROMPTS
) ),
re.I
) )
PASSWORD_PROMPT = b('password')
SUDO_OPTIONS = [ SUDO_OPTIONS = [
#(False, 'bool', '--askpass', '-A') #(False, 'bool', '--askpass', '-A')
#(False, 'str', '--auth-type', '-a') #(False, 'str', '--auth-type', '-a')
@ -181,10 +183,7 @@ def option(default, *args):
return default return default
class Stream(mitogen.parent.Stream): class Options(mitogen.parent.Options):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
sudo_path = 'sudo' sudo_path = 'sudo'
username = 'root' username = 'root'
password = None password = None
@ -195,15 +194,16 @@ class Stream(mitogen.parent.Stream):
selinux_role = None selinux_role = None
selinux_type = None selinux_type = None
def construct(self, username=None, sudo_path=None, password=None, def __init__(self, username=None, sudo_path=None, password=None,
preserve_env=None, set_home=None, sudo_args=None, preserve_env=None, set_home=None, sudo_args=None,
login=None, selinux_role=None, selinux_type=None, **kwargs): login=None, selinux_role=None, selinux_type=None, **kwargs):
super(Stream, self).construct(**kwargs) super(Options, self).__init__(**kwargs)
opts = parse_sudo_flags(sudo_args or []) opts = parse_sudo_flags(sudo_args or [])
self.username = option(self.username, username, opts.user) self.username = option(self.username, username, opts.user)
self.sudo_path = option(self.sudo_path, sudo_path) self.sudo_path = option(self.sudo_path, sudo_path)
self.password = password or None if password:
self.password = mitogen.core.to_text(password)
self.preserve_env = option(self.preserve_env, self.preserve_env = option(self.preserve_env,
preserve_env, opts.preserve_env) preserve_env, opts.preserve_env)
self.set_home = option(self.set_home, set_home, opts.set_home) self.set_home = option(self.set_home, set_home, opts.set_home)
@ -211,67 +211,61 @@ class Stream(mitogen.parent.Stream):
self.selinux_role = option(self.selinux_role, selinux_role, opts.role) self.selinux_role = option(self.selinux_role, selinux_role, opts.role)
self.selinux_type = option(self.selinux_type, selinux_type, opts.type) self.selinux_type = option(self.selinux_type, selinux_type, opts.type)
class SetupProtocol(mitogen.parent.RegexProtocol):
password_sent = False
def _on_password_prompt(self, line, match):
LOG.debug('%s: (password prompt): %s',
self.stream.name, line.decode('utf-8', 'replace'))
if self.stream.conn.options.password is None:
self.stream.conn._fail_connection(
PasswordError(password_required_msg)
)
return
if self.password_sent:
self.stream.conn._fail_connection(
PasswordError(password_incorrect_msg)
)
return
self.stream.transmit_side.write(
(self.stream.conn.options.password + '\n').encode('utf-8')
)
self.password_sent = True
PARTIAL_PATTERNS = [
(PASSWORD_PROMPT_RE, _on_password_prompt),
]
class Connection(mitogen.parent.Connection):
diag_protocol_class = SetupProtocol
options_class = Options
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False
def _get_name(self): def _get_name(self):
return u'sudo.' + mitogen.core.to_text(self.username) return u'sudo.' + mitogen.core.to_text(self.options.username)
def get_boot_command(self): def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July # Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, supply short-form # 2013, so even though we parse long-format options, supply short-form
# to the sudo command. # to the sudo command.
bits = [self.sudo_path, '-u', self.username] bits = [self.options.sudo_path, '-u', self.options.username]
if self.preserve_env: if self.options.preserve_env:
bits += ['-E'] bits += ['-E']
if self.set_home: if self.options.set_home:
bits += ['-H'] bits += ['-H']
if self.login: if self.options.login:
bits += ['-i'] bits += ['-i']
if self.selinux_role: if self.options.selinux_role:
bits += ['-r', self.selinux_role] bits += ['-r', self.options.selinux_role]
if self.selinux_type: if self.options.selinux_type:
bits += ['-t', self.selinux_type] bits += ['-t', self.options.selinux_type]
bits = bits + ['--'] + super(Stream, self).get_boot_command() bits = bits + ['--'] + super(Connection, self).get_boot_command()
LOG.debug('sudo command line: %r', bits) LOG.debug('sudo command line: %r', bits)
return bits return bits
password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required'
def _connect_input_loop(self, it):
password_sent = False
for buf in it:
LOG.debug('%s: received %r', self.name, buf)
if buf.endswith(self.EC0_MARKER):
self._ec0_received()
return
match = PASSWORD_PROMPT_RE.search(buf.decode('utf-8').lower())
if match is not None:
LOG.debug('%s: matched password prompt %r',
self.name, match.group(0))
if self.password is None:
raise PasswordError(self.password_required_msg)
if password_sent:
raise PasswordError(self.password_incorrect_msg)
self.diag_stream.transmit_side.write(
(mitogen.core.to_text(self.password) + '\n').encode('utf-8')
)
password_sent = True
raise mitogen.core.StreamError('bootstrap failed')
def _connect_bootstrap(self):
fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
it = mitogen.parent.iter_read(
fds=fds,
deadline=self.connect_deadline,
)
try:
self._connect_input_loop(it)
finally:
it.close()

@ -65,9 +65,38 @@ def make_socket_path():
return tempfile.mktemp(prefix='mitogen_unix_', suffix='.sock') return tempfile.mktemp(prefix='mitogen_unix_', suffix='.sock')
class Listener(mitogen.core.BasicStream): class ListenerStream(mitogen.core.Stream):
def on_receive(self, broker):
sock, _ = self.receive_side.fp.accept()
try:
self.protocol.on_accept_client(sock)
except:
sock.close()
raise
class Listener(mitogen.core.Protocol):
stream_class = ListenerStream
keep_alive = True keep_alive = True
@classmethod
def build_stream(cls, router, path=None, backlog=100):
if not path:
path = make_socket_path()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(path) and is_path_dead(path):
LOG.debug('%r: deleting stale %r', self, path)
os.unlink(path)
sock.bind(path)
os.chmod(path, int('0600', 8))
sock.listen(backlog)
stream = super(Listener, cls).build_stream(router, path)
stream.accept(sock, sock)
router.broker.start_receive(stream)
return stream
def __repr__(self): def __repr__(self):
return '%s.%s(%r)' % ( return '%s.%s(%r)' % (
__name__, __name__,
@ -75,20 +104,9 @@ class Listener(mitogen.core.BasicStream):
self.path, self.path,
) )
def __init__(self, router, path=None, backlog=100): def __init__(self, router, path):
self._router = router self._router = router
self.path = path or make_socket_path() self.path = path
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if os.path.exists(self.path) and is_path_dead(self.path):
LOG.debug('%r: deleting stale %r', self, self.path)
os.unlink(self.path)
self._sock.bind(self.path)
os.chmod(self.path, int('0600', 8))
self._sock.listen(backlog)
self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self)
def _unlink_socket(self): def _unlink_socket(self):
try: try:
@ -102,10 +120,9 @@ class Listener(mitogen.core.BasicStream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
broker.stop_receive(self) broker.stop_receive(self)
self._unlink_socket() self._unlink_socket()
self._sock.close() self.receive_side.close()
self.receive_side.closed = True
def _accept_client(self, sock): def on_accept_client(self, sock):
sock.setblocking(True) sock.setblocking(True)
try: try:
pid, = struct.unpack('>L', sock.recv(4)) pid, = struct.unpack('>L', sock.recv(4))
@ -115,12 +132,6 @@ class Listener(mitogen.core.BasicStream):
return return
context_id = self._router.id_allocator.allocate() context_id = self._router.id_allocator.allocate()
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.Stream(self._router, context_id)
stream.name = u'unix_client.%d' % (pid,)
stream.auth_id = mitogen.context_id
stream.is_privileged = True
try: try:
sock.send(struct.pack('>LLL', context_id, mitogen.context_id, sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid())) os.getpid()))
@ -129,21 +140,22 @@ class Listener(mitogen.core.BasicStream):
self, pid, sys.exc_info()[1]) self, pid, sys.exc_info()[1])
return return
context = mitogen.parent.Context(self._router, context_id)
stream = mitogen.core.MitogenProtocol.build_stream(
router=self._router,
remote_id=context_id,
)
stream.name = u'unix_client.%d' % (pid,)
stream.protocol.auth_id = mitogen.context_id
stream.protocol.is_privileged = True
side = mitogen.core.Side(stream, sock)
stream.receive_side = side
stream.transmit_side = side
LOG.debug('%r: accepted %r', self, stream) LOG.debug('%r: accepted %r', self, stream)
stream.accept(sock.fileno(), sock.fileno())
self._router.register(context, stream) self._router.register(context, stream)
def on_receive(self, broker):
sock, _ = self._sock.accept()
try:
self._accept_client(sock)
finally:
sock.close()
def connect(path, broker=None): def _connect(path, broker, sock):
LOG.debug('unix.connect(path=%r)', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path) sock.connect(path)
sock.send(struct.pack('>L', os.getpid())) sock.send(struct.pack('>L', os.getpid()))
mitogen.context_id, remote_id, pid = struct.unpack('>LLL', sock.recv(12)) mitogen.context_id, remote_id, pid = struct.unpack('>LLL', sock.recv(12))
@ -154,15 +166,24 @@ def connect(path, broker=None):
mitogen.context_id, remote_id) mitogen.context_id, remote_id)
router = mitogen.master.Router(broker=broker) router = mitogen.master.Router(broker=broker)
stream = mitogen.core.Stream(router, remote_id) stream = mitogen.core.MitogenProtocol.build_stream(router, remote_id)
stream.accept(sock.fileno(), sock.fileno()) side = mitogen.core.Side(stream, sock)
stream.transmit_side = side
stream.receive_side = side
stream.name = u'unix_listener.%d' % (pid,) stream.name = u'unix_listener.%d' % (pid,)
context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)
mitogen.core.listen(router.broker, 'shutdown', mitogen.core.listen(router.broker, 'shutdown',
lambda: router.disconnect_stream(stream)) lambda: router.disconnect_stream(stream))
sock.close() context = mitogen.parent.Context(router, remote_id)
router.register(context, stream)
return router, context return router, context
def connect(path, broker=None):
LOG.debug('unix.connect(path=%r)', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
return _connect(path, broker, sock)
except:
sock.close()
raise

@ -19,15 +19,17 @@ import mitogen.sudo
router = mitogen.master.Router() router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0) context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo') options = mitogen.ssh.Options(max_message_size=0, hostname='foo')
conn = mitogen.ssh.Connection(options, router)
conn.context_id = 123
print('SSH command size: %s' % (len(' '.join(stream.get_boot_command())),)) print('SSH command size: %s' % (len(' '.join(conn.get_boot_command())),))
print('Preamble size: %s (%.2fKiB)' % ( print('Preamble size: %s (%.2fKiB)' % (
len(stream.get_preamble()), len(conn.get_preamble()),
len(stream.get_preamble()) / 1024.0, len(conn.get_preamble()) / 1024.0,
)) ))
if '--dump' in sys.argv: if '--dump' in sys.argv:
print(zlib.decompress(stream.get_preamble())) print(zlib.decompress(conn.get_preamble()))
exit() exit()

@ -1,13 +0,0 @@
#!/usr/bin/env python
# I produce text every 100ms, for testing mitogen.core.iter_read()
import sys
import time
i = 0
while True:
i += 1
sys.stdout.write(str(i))
sys.stdout.flush()
time.sleep(0.1)

@ -1,9 +0,0 @@
#!/usr/bin/env python
# I consume 65535 bytes every 10ms, for testing mitogen.core.write_all()
import os
import time
while True:
os.read(0, 65535)
time.sleep(0.01)

@ -21,7 +21,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(argv[1], 'exec') self.assertEquals(argv[1], 'exec')
self.assertEquals(argv[2], '--interactive') self.assertEquals(argv[2], '--interactive')
self.assertEquals(argv[3], 'container_name') self.assertEquals(argv[3], 'container_name')
self.assertEquals(argv[4], stream.python_path) self.assertEquals(argv[4], stream.conn.options.python_path)
if __name__ == '__main__': if __name__ == '__main__':

@ -19,8 +19,10 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
# * 3.x starting 2.7 # * 3.x starting 2.7
def test_valid_syntax(self): def test_valid_syntax(self):
stream = mitogen.parent.Stream(self.router, 0, max_message_size=123) options = mitogen.parent.Options(max_message_size=123)
args = stream.get_boot_command() conn = mitogen.parent.Connection(options, self.router)
conn.context = mitogen.core.Context(None, 123)
args = conn.get_boot_command()
# Executing the boot command will print "EC0" and expect to read from # Executing the boot command will print "EC0" and expect to read from
# stdin, which will fail because it's pointing at /dev/null, causing # stdin, which will fail because it's pointing at /dev/null, causing
@ -38,7 +40,8 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase):
) )
stdout, stderr = proc.communicate() stdout, stderr = proc.communicate()
self.assertEquals(0, proc.returncode) self.assertEquals(0, proc.returncode)
self.assertEquals(mitogen.parent.Stream.EC0_MARKER, stdout) self.assertEquals(stdout,
mitogen.parent.BootstrapProtocol.EC0_MARKER+'\n')
self.assertIn(b("Error -5 while decompressing data"), stderr) self.assertIn(b("Error -5 while decompressing data"), stderr)
finally: finally:
fp.close() fp.close()

@ -38,7 +38,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
lxc_attach_path='true', lxc_attach_path='true',
) )
) )
self.assertTrue(str(e).endswith(mitogen.lxc.Stream.eof_error_hint)) self.assertTrue(str(e).endswith(mitogen.lxc.Connection.eof_error_hint))
if __name__ == '__main__': if __name__ == '__main__':

@ -30,7 +30,7 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
lxc_path='true', lxc_path='true',
) )
) )
self.assertTrue(str(e).endswith(mitogen.lxd.Stream.eof_error_hint)) self.assertTrue(str(e).endswith(mitogen.lxd.Connection.eof_error_hint))
if __name__ == '__main__': if __name__ == '__main__':

@ -0,0 +1,34 @@
import unittest2
import mock
import mitogen.core
import testlib
class ReceiveOneTest(testlib.TestCase):
klass = mitogen.core.MitogenProtocol
def test_corruption(self):
broker = mock.Mock()
router = mock.Mock()
stream = mock.Mock()
protocol = self.klass(router, 1)
protocol.stream = stream
junk = mitogen.core.b('x') * mitogen.core.Message.HEADER_LEN
capture = testlib.LogCapturer()
capture.start()
protocol.on_receive(broker, junk)
capture.stop()
self.assertEquals(1, stream.on_disconnect.call_count)
expect = self.klass.corrupt_msg % (stream.name, junk)
self.assertTrue(expect in capture.raw())
if __name__ == '__main__':
unittest2.main()

@ -49,7 +49,7 @@ def wait_for_empty_output_queue(sync_recv, context):
while True: while True:
# Now wait for the RPC to exit the output queue. # Now wait for the RPC to exit the output queue.
stream = router.stream_by_id(context.context_id) stream = router.stream_by_id(context.context_id)
if broker.defer_sync(lambda: stream.pending_bytes()) == 0: if broker.defer_sync(lambda: stream.protocol.pending_bytes()) == 0:
return return
time.sleep(0.1) time.sleep(0.1)
@ -69,35 +69,17 @@ class GetDefaultRemoteNameTest(testlib.TestCase):
self.assertEquals("ECORP_Administrator@box:123", self.func()) self.assertEquals("ECORP_Administrator@box:123", self.func())
class WstatusToStrTest(testlib.TestCase): class ReturncodeToStrTest(testlib.TestCase):
func = staticmethod(mitogen.parent.wstatus_to_str) func = staticmethod(mitogen.parent.returncode_to_str)
def test_return_zero(self): def test_return_zero(self):
pid = os.fork() self.assertEquals(self.func(0), 'exited with return code 0')
if not pid:
os._exit(0)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(self.func(status),
'exited with return code 0')
def test_return_one(self): def test_return_one(self):
pid = os.fork() self.assertEquals(self.func(1), 'exited with return code 1')
if not pid:
os._exit(1)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(
self.func(status),
'exited with return code 1'
)
def test_sigkill(self): def test_sigkill(self):
pid = os.fork() self.assertEquals(self.func(-signal.SIGKILL),
if not pid:
time.sleep(600)
os.kill(pid, signal.SIGKILL)
(pid, status), _ = mitogen.core.io_op(os.waitpid, pid, 0)
self.assertEquals(
self.func(status),
'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),) 'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),)
) )
@ -107,20 +89,20 @@ class WstatusToStrTest(testlib.TestCase):
class ReapChildTest(testlib.RouterMixin, testlib.TestCase): class ReapChildTest(testlib.RouterMixin, testlib.TestCase):
def test_connect_timeout(self): def test_connect_timeout(self):
# Ensure the child process is reaped if the connection times out. # Ensure the child process is reaped if the connection times out.
stream = mitogen.parent.Stream( options = mitogen.parent.Options(
router=self.router,
remote_id=1234,
old_router=self.router, old_router=self.router,
max_message_size=self.router.max_message_size, max_message_size=self.router.max_message_size,
python_path=testlib.data_path('python_never_responds.py'), python_path=testlib.data_path('python_never_responds.py'),
connect_timeout=0.5, connect_timeout=0.5,
) )
conn = mitogen.parent.Connection(options, router=self.router)
self.assertRaises(mitogen.core.TimeoutError, self.assertRaises(mitogen.core.TimeoutError,
lambda: stream.connect() lambda: conn.connect(context=mitogen.core.Context(None, 1234))
) )
wait_for_child(stream.pid) wait_for_child(conn.proc.pid)
e = self.assertRaises(OSError, e = self.assertRaises(OSError,
lambda: os.kill(stream.pid, 0) lambda: os.kill(conn.proc.pid, 0)
) )
self.assertEquals(e.args[0], errno.ESRCH) self.assertEquals(e.args[0], errno.ESRCH)
@ -133,7 +115,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
connect_timeout=3, connect_timeout=3,
) )
) )
prefix = "EOF on stream; last 300 bytes received: " prefix = mitogen.parent.Connection.eof_error_msg
self.assertTrue(e.args[0].startswith(prefix)) self.assertTrue(e.args[0].startswith(prefix))
def test_via_eof(self): def test_via_eof(self):
@ -142,12 +124,12 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local( lambda: self.router.local(
via=local, via=local,
python_path='true', python_path='echo',
connect_timeout=3, connect_timeout=3,
) )
) )
s = "EOF on stream; last 300 bytes received: " expect = mitogen.parent.Connection.eof_error_msg
self.assertTrue(s in e.args[0]) self.assertTrue(expect in e.args[0])
def test_direct_enoent(self): def test_direct_enoent(self):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
@ -185,11 +167,15 @@ class OpenPtyTest(testlib.TestCase):
func = staticmethod(mitogen.parent.openpty) func = staticmethod(mitogen.parent.openpty)
def test_pty_returned(self): def test_pty_returned(self):
master_fd, slave_fd = self.func() master_fp, slave_fp = self.func()
self.assertTrue(isinstance(master_fd, int)) try:
self.assertTrue(isinstance(slave_fd, int)) self.assertTrue(master_fp.isatty())
os.close(master_fd) self.assertTrue(isinstance(master_fp, file))
os.close(slave_fd) self.assertTrue(slave_fp.isatty())
self.assertTrue(isinstance(slave_fp, file))
finally:
master_fp.close()
slave_fp.close()
@mock.patch('os.openpty') @mock.patch('os.openpty')
def test_max_reached(self, openpty): def test_max_reached(self, openpty):
@ -204,20 +190,20 @@ class OpenPtyTest(testlib.TestCase):
@mock.patch('os.openpty') @mock.patch('os.openpty')
def test_broken_linux_fallback(self, openpty): def test_broken_linux_fallback(self, openpty):
openpty.side_effect = OSError(errno.EPERM) openpty.side_effect = OSError(errno.EPERM)
master_fd, slave_fd = self.func() master_fp, slave_fp = self.func()
try: try:
st = os.fstat(master_fd) st = os.fstat(master_fp.fileno())
self.assertEquals(5, os.major(st.st_rdev)) self.assertEquals(5, os.major(st.st_rdev))
flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) flags = fcntl.fcntl(master_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR) self.assertTrue(flags & os.O_RDWR)
st = os.fstat(slave_fd) st = os.fstat(slave_fp.fileno())
self.assertEquals(136, os.major(st.st_rdev)) self.assertEquals(136, os.major(st.st_rdev))
flags = fcntl.fcntl(slave_fd, fcntl.F_GETFL) flags = fcntl.fcntl(slave_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR) self.assertTrue(flags & os.O_RDWR)
finally: finally:
os.close(master_fd) master_fp.close()
os.close(slave_fd) slave_fp.close()
class TtyCreateChildTest(testlib.TestCase): class TtyCreateChildTest(testlib.TestCase):
@ -235,125 +221,21 @@ class TtyCreateChildTest(testlib.TestCase):
# read a password. # read a password.
tf = tempfile.NamedTemporaryFile() tf = tempfile.NamedTemporaryFile()
try: try:
pid, fd, _ = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
deadline = time.time() + 5.0 deadline = time.time() + 5.0
for line in mitogen.parent.iter_read([fd], deadline): mitogen.core.set_block(proc.stdio_fp.fileno())
self.assertEquals(mitogen.core.b('hi\n'), line) self.assertEquals(mitogen.core.b('hi\n'), proc.stdio_fp.read())
break waited_pid, status = os.waitpid(proc.pid, 0)
waited_pid, status = os.waitpid(pid, 0) self.assertEquals(proc.pid, waited_pid)
self.assertEquals(pid, waited_pid)
self.assertEquals(0, status) self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read()) self.assertEquals(mitogen.core.b(''), tf.read())
os.close(fd) proc.stdio_fp.close()
finally: finally:
tf.close() tf.close()
class IterReadTest(testlib.TestCase):
func = staticmethod(mitogen.parent.iter_read)
def make_proc(self):
# I produce text every 100ms.
args = [testlib.data_path('iter_read_generator.py')]
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
mitogen.core.set_nonblock(proc.stdout.fileno())
return proc
def test_no_deadline(self):
proc = self.make_proc()
try:
reader = self.func([proc.stdout.fileno()])
for i, chunk in enumerate(reader):
self.assertEqual(1+i, int(chunk))
if i > 2:
break
finally:
Popen__terminate(proc)
proc.stdout.close()
def test_deadline_exceeded_before_call(self):
proc = self.make_proc()
reader = self.func([proc.stdout.fileno()], 0)
try:
got = []
try:
for chunk in reader:
got.append(chunk)
assert 0, 'TimeoutError not raised'
except mitogen.core.TimeoutError:
self.assertEqual(len(got), 0)
finally:
Popen__terminate(proc)
proc.stdout.close()
def test_deadline_exceeded_during_call(self):
proc = self.make_proc()
deadline = time.time() + 0.4
reader = self.func([proc.stdout.fileno()], deadline)
try:
got = []
try:
for chunk in reader:
if time.time() > (deadline + 1.0):
assert 0, 'TimeoutError not raised'
got.append(chunk)
except mitogen.core.TimeoutError:
# Give a little wiggle room in case of imperfect scheduling.
# Ideal number should be 9.
self.assertLess(deadline, time.time())
self.assertLess(1, len(got))
self.assertLess(len(got), 20)
finally:
Popen__terminate(proc)
proc.stdout.close()
class WriteAllTest(testlib.TestCase):
func = staticmethod(mitogen.parent.write_all)
def make_proc(self):
args = [testlib.data_path('write_all_consumer.py')]
proc = subprocess.Popen(args, stdin=subprocess.PIPE)
mitogen.core.set_nonblock(proc.stdin.fileno())
return proc
ten_ms_chunk = (mitogen.core.b('x') * 65535)
def test_no_deadline(self):
proc = self.make_proc()
try:
self.func(proc.stdin.fileno(), self.ten_ms_chunk)
finally:
Popen__terminate(proc)
proc.stdin.close()
def test_deadline_exceeded_before_call(self):
proc = self.make_proc()
try:
self.assertRaises(mitogen.core.TimeoutError, (
lambda: self.func(proc.stdin.fileno(), self.ten_ms_chunk, 0)
))
finally:
Popen__terminate(proc)
proc.stdin.close()
def test_deadline_exceeded_during_call(self):
proc = self.make_proc()
try:
deadline = time.time() + 0.1 # 100ms deadline
self.assertRaises(mitogen.core.TimeoutError, (
lambda: self.func(proc.stdin.fileno(),
self.ten_ms_chunk * 100, # 1s of data
deadline)
))
finally:
Popen__terminate(proc)
proc.stdin.close()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase): class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self): def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is # Easy mode: process notices its own directly connected child is
@ -394,7 +276,7 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
c2 = self.router.local() c2 = self.router.local()
# Let c1 call functions in c2. # Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c1.call(mitogen.parent.upgrade_router) c1.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router) sync_recv = mitogen.core.Receiver(self.router)
@ -412,14 +294,14 @@ class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_far_sibling_disconnected(self): def test_far_sibling_disconnected(self):
# God mode: child of child notices child of child of parent has # God mode: child of child notices child of child of parent has
# disconnected. # disconnected.
c1 = self.router.local() c1 = self.router.local(name='c1')
c11 = self.router.local(via=c1) c11 = self.router.local(name='c11', via=c1)
c2 = self.router.local() c2 = self.router.local(name='c2')
c22 = self.router.local(via=c2) c22 = self.router.local(name='c22', via=c2)
# Let c1 call functions in c2. # Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c11.call(mitogen.parent.upgrade_router) c11.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router) sync_recv = mitogen.core.Receiver(self.router)

@ -42,8 +42,8 @@ class SockMixin(object):
self.l2_sock, self.r2_sock = socket.socketpair() self.l2_sock, self.r2_sock = socket.socketpair()
self.l2 = self.l2_sock.fileno() self.l2 = self.l2_sock.fileno()
self.r2 = self.r2_sock.fileno() self.r2 = self.r2_sock.fileno()
for fd in self.l1, self.r1, self.l2, self.r2: for fp in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fd) mitogen.core.set_nonblock(fp)
def fill(self, fd): def fill(self, fd):
"""Make `fd` unwriteable.""" """Make `fd` unwriteable."""
@ -354,17 +354,17 @@ class FileClosedMixin(PollerMixin, SockMixin):
class TtyHangupMixin(PollerMixin): class TtyHangupMixin(PollerMixin):
def test_tty_hangup_detected(self): def test_tty_hangup_detected(self):
# bug in initial select.poll() implementation failed to detect POLLHUP. # bug in initial select.poll() implementation failed to detect POLLHUP.
master_fd, slave_fd = mitogen.parent.openpty() master_fp, slave_fp = mitogen.parent.openpty()
try: try:
self.p.start_receive(master_fd) self.p.start_receive(master_fp.fileno())
self.assertEquals([], list(self.p.poll(0))) self.assertEquals([], list(self.p.poll(0)))
os.close(slave_fd) slave_fp.close()
slave_fd = None slave_fp = None
self.assertEquals([master_fd], list(self.p.poll(0))) self.assertEquals([master_fp.fileno()], list(self.p.poll(0)))
finally: finally:
if slave_fd is not None: if slave_fp is not None:
os.close(slave_fd) slave_fp.close()
os.close(master_fd) master_fp.close()
class DistinctDataMixin(PollerMixin, SockMixin): class DistinctDataMixin(PollerMixin, SockMixin):

@ -105,7 +105,7 @@ class BrokenModulesTest(testlib.TestCase):
# unavailable. Should never happen in the real world. # unavailable. Should never happen in the real world.
stream = mock.Mock() stream = mock.Mock()
stream.sent_modules = set() stream.protocol.sent_modules = set()
router = mock.Mock() router = mock.Mock()
router.stream_by_id = lambda n: stream router.stream_by_id = lambda n: stream
@ -143,7 +143,7 @@ class BrokenModulesTest(testlib.TestCase):
import six_brokenpkg import six_brokenpkg
stream = mock.Mock() stream = mock.Mock()
stream.sent_modules = set() stream.protocol.sent_modules = set()
router = mock.Mock() router = mock.Mock()
router.stream_by_id = lambda n: stream router.stream_by_id = lambda n: stream

@ -171,7 +171,7 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
self.assertTrue(sem.get().is_dead) self.assertTrue(sem.get().is_dead)
# Ensure it was logged. # Ensure it was logged.
expect = '_broker_main() crashed' expect = 'broker crashed'
self.assertTrue(expect in log.stop()) self.assertTrue(expect in log.stop())
self.broker.join() self.broker.join()
@ -364,8 +364,8 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
# treated like a parent. # treated like a parent.
l1 = self.router.local() l1 = self.router.local()
l1s = self.router.stream_by_id(l1.context_id) l1s = self.router.stream_by_id(l1.context_id)
l1s.auth_id = mitogen.context_id l1s.protocol.auth_id = mitogen.context_id
l1s.is_privileged = True l1s.protocol.is_privileged = True
l2 = self.router.local() l2 = self.router.local()
e = self.assertRaises(mitogen.core.CallError, e = self.assertRaises(mitogen.core.CallError,
@ -378,12 +378,21 @@ class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase): class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
def test_egress_ids_populated(self): def test_egress_ids_populated(self):
# Ensure Stream.egress_ids is populated on message reception. # Ensure Stream.egress_ids is populated on message reception.
c1 = self.router.local() c1 = self.router.local(name='c1')
stream = self.router.stream_by_id(c1.context_id) c2 = self.router.local(name='c2')
self.assertEquals(set(), stream.egress_ids)
c1.call(time.sleep, 0) c1s = self.router.stream_by_id(c1.context_id)
self.assertEquals(set([mitogen.context_id]), stream.egress_ids) try:
c1.call(ping_context, c2)
except mitogen.core.CallError:
# Fails because siblings cant call funcs in each other, but this
# causes messages to be sent.
pass
self.assertEquals(c1s.protocol.egress_ids, set([
mitogen.context_id,
c2.context_id,
]))
if __name__ == '__main__': if __name__ == '__main__':

@ -44,8 +44,8 @@ class ActivationTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue(isinstance(id_, int)) self.assertTrue(isinstance(id_, int))
def test_sibling_cannot_activate_framework(self): def test_sibling_cannot_activate_framework(self):
l1 = self.router.local() l1 = self.router.local(name='l1')
l2 = self.router.local() l2 = self.router.local(name='l2')
exc = self.assertRaises(mitogen.core.CallError, exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id')) lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0]) self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0])

@ -42,8 +42,6 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
class SshTest(testlib.DockerMixin, testlib.TestCase): class SshTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def test_debug_decoding(self): def test_debug_decoding(self):
# ensure filter_debug_logs() decodes the logged string. # ensure filter_debug_logs() decodes the logged string.
capture = testlib.LogCapturer() capture = testlib.LogCapturer()
@ -93,7 +91,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError: except mitogen.ssh.PasswordError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_required_msg) self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_password_incorrect(self): def test_password_incorrect(self):
try: try:
@ -105,7 +103,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError: except mitogen.ssh.PasswordError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_incorrect_msg) self.assertEqual(e.args[0], mitogen.ssh.password_incorrect_msg)
def test_password_specified(self): def test_password_specified(self):
context = self.docker_ssh( context = self.docker_ssh(
@ -127,7 +125,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
except mitogen.ssh.PasswordError: except mitogen.ssh.PasswordError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
self.assertEqual(e.args[0], self.stream_class.password_required_msg) self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_pubkey_specified(self): def test_pubkey_specified(self):
context = self.docker_ssh( context = self.docker_ssh(
@ -150,7 +148,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
check_host_keys='enforce', check_host_keys='enforce',
) )
) )
self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg) self.assertEquals(e.args[0], mitogen.ssh.hostkey_failed_msg)
finally: finally:
fp.close() fp.close()
@ -184,8 +182,6 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
class BannerTest(testlib.DockerMixin, testlib.TestCase): class BannerTest(testlib.DockerMixin, testlib.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's # Verify the ability to disambiguate random spam appearing in the SSHd's
# login banner from a legitimate password prompt. # login banner from a legitimate password prompt.
stream_class = mitogen.ssh.Stream
def test_verbose_enabled(self): def test_verbose_enabled(self):
context = self.docker_ssh( context = self.docker_ssh(
username='mitogen__has_sudo', username='mitogen__has_sudo',
@ -210,8 +206,6 @@ class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase): class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def test_check_host_keys_accept(self): def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept # required=true, host_key_checking=accept
context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept') context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')

@ -1,33 +0,0 @@
import unittest2
import mock
import mitogen.core
import testlib
class ReceiveOneTest(testlib.TestCase):
klass = mitogen.core.Stream
def test_corruption(self):
broker = mock.Mock()
router = mock.Mock()
stream = self.klass(router, 1)
junk = mitogen.core.b('x') * stream.HEADER_LEN
stream._input_buf = [junk]
stream._input_buf_len = len(junk)
capture = testlib.LogCapturer()
capture.start()
ret = stream._receive_one(broker)
#self.assertEquals(1, broker.stop_receive.mock_calls)
capture.stop()
self.assertFalse(ret)
self.assertTrue((self.klass.corrupt_msg % (junk,)) in capture.raw())
if __name__ == '__main__':
unittest2.main()

@ -1,9 +1,9 @@
import getpass
import os import os
import mitogen import mitogen
import mitogen.lxd import mitogen.su
import mitogen.parent
import unittest2 import unittest2
@ -21,12 +21,41 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV')) argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV'))
return context, argv return context, argv
def test_basic(self): def test_basic(self):
context, argv = self.run_su() context, argv = self.run_su()
self.assertEquals(argv[1], 'root') self.assertEquals(argv[1], 'root')
self.assertEquals(argv[2], '-c') self.assertEquals(argv[2], '-c')
class SuTest(testlib.DockerMixin, testlib.TestCase):
def test_password_required(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.su(via=ssh)
)
self.assertTrue(mitogen.su.password_required_msg in str(e))
def test_password_incorrect(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.su(via=ssh, password='x')
)
self.assertTrue(mitogen.su.password_incorrect_msg in str(e))
def test_password_okay(self):
ssh = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
context = self.router.su(via=ssh, password='rootpassword')
self.assertEquals('root', context.call(getpass.getuser))
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -2,8 +2,7 @@
import os import os
import mitogen import mitogen
import mitogen.lxd import mitogen.sudo
import mitogen.parent
import unittest2 import unittest2
@ -79,7 +78,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh) lambda: self.router.sudo(via=ssh)
) )
self.assertTrue(mitogen.sudo.Stream.password_required_msg in str(e)) self.assertTrue(mitogen.sudo.password_required_msg in str(e))
def test_password_incorrect(self): def test_password_incorrect(self):
ssh = self.docker_ssh( ssh = self.docker_ssh(
@ -91,7 +90,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh, password='x') lambda: self.router.sudo(via=ssh, password='x')
) )
self.assertTrue(mitogen.sudo.Stream.password_incorrect_msg in str(e)) self.assertTrue(mitogen.sudo.password_incorrect_msg in str(e))
def test_password_okay(self): def test_password_okay(self):
ssh = self.docker_ssh( ssh = self.docker_ssh(
@ -103,7 +102,7 @@ class NonEnglishPromptTest(testlib.DockerMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.sudo(via=ssh, password='rootpassword') lambda: self.router.sudo(via=ssh, password='rootpassword')
) )
self.assertTrue(mitogen.sudo.Stream.password_incorrect_msg in str(e)) self.assertTrue(mitogen.sudo.password_incorrect_msg in str(e))
if __name__ == '__main__': if __name__ == '__main__':

@ -67,12 +67,12 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.unix.Listener klass = mitogen.unix.Listener
def test_constructor_basic(self): def test_constructor_basic(self):
listener = self.klass(router=self.router) listener = self.klass.build_stream(router=self.router)
capture = testlib.LogCapturer() capture = testlib.LogCapturer()
capture.start() capture.start()
try: try:
self.assertFalse(mitogen.unix.is_path_dead(listener.path)) self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.path) os.unlink(listener.protocol.path)
# ensure we catch 0 byte read error log message # ensure we catch 0 byte read error log message
self.broker.shutdown() self.broker.shutdown()
self.broker.join() self.broker.join()
@ -96,12 +96,14 @@ class ClientTest(testlib.TestCase):
def _test_simple_client(self, path): def _test_simple_client(self, path):
router, context = self._try_connect(path) router, context = self._try_connect(path)
try:
self.assertEquals(0, context.context_id) self.assertEquals(0, context.context_id)
self.assertEquals(1, mitogen.context_id) self.assertEquals(1, mitogen.context_id)
self.assertEquals(0, mitogen.parent_id) self.assertEquals(0, mitogen.parent_id)
resp = context.call_service(service_name=MyService, method_name='ping') resp = context.call_service(service_name=MyService, method_name='ping')
self.assertEquals(mitogen.context_id, resp['src_id']) self.assertEquals(mitogen.context_id, resp['src_id'])
self.assertEquals(0, resp['auth_id']) self.assertEquals(0, resp['auth_id'])
finally:
router.broker.shutdown() router.broker.shutdown()
router.broker.join() router.broker.join()
os.unlink(path) os.unlink(path)
@ -112,7 +114,7 @@ class ClientTest(testlib.TestCase):
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
try: try:
try: try:
listener = cls.klass(path=path, router=router) listener = cls.klass.build_stream(path=path, router=router)
pool = mitogen.service.Pool(router=router, services=[ pool = mitogen.service.Pool(router=router, services=[
MyService(latch=latch, router=router), MyService(latch=latch, router=router),
]) ])

Loading…
Cancel
Save