Mostly implement hybrid TTY/socket mode for sudo and SSH.

Presently there is still no mechanism to add :attr:`tty_stream` to the
multiplexer after connection is successful, but for now it's not
expected that anything will be logged to it anyway.

Closes #148.
pull/194/head
David Wilson 7 years ago
parent fca22efe90
commit e43c6c531b

@ -402,27 +402,12 @@ Helper Functions
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. function:: create_child (\*args) .. autofunction:: create_child
Create a child process whose stdin/stdout is connected to a socket,
returning `(pid, socket_obj)`.
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. function:: tty_create_child (\*args) .. autofunction:: tty_create_child
Return a file descriptor connected to the master end of a pseudo-terminal,
whose slave end is connected to stdin/stdout/stderr of a new child process.
The child is created such that the pseudo-terminal becomes its controlling
TTY, ensuring access to /dev/tty returns a new file descriptor open on the
slave end.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, fd)`
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master

@ -108,7 +108,7 @@ class Stream(mitogen.parent.Stream):
# Decouple the socket from the lifetime of the Python socket object. # Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno()) fd = os.dup(parentfp.fileno())
parentfp.close() parentfp.close()
return self.pid, fd return self.pid, fd, None
else: else:
parentfp.close() parentfp.close()
self._wrap_child_main(childfp) self._wrap_child_main(childfp)
@ -155,6 +155,6 @@ class Stream(mitogen.parent.Stream):
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
# None required. # None required.
pass pass

@ -243,6 +243,12 @@ def create_socketpair():
def create_child(*args): def create_child(*args):
"""
Create a child process whose stdin/stdout is connected to a socket.
:returns:
`(pid, socket_obj, :data:`None`)`
"""
parentfp, childfp = create_socketpair() parentfp, childfp = create_socketpair()
# When running under a monkey patches-enabled gevent, the socket module # When running under a monkey patches-enabled gevent, the socket module
# yields file descriptors who already have O_NONBLOCK, which is # yields file descriptors who already have O_NONBLOCK, which is
@ -263,7 +269,7 @@ def create_child(*args):
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s', LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, fd, os.getpid(), Argv(args)) proc.pid, fd, os.getpid(), Argv(args))
return proc.pid, fd return proc.pid, fd, None
def _acquire_controlling_tty(): def _acquire_controlling_tty():
@ -271,13 +277,26 @@ def _acquire_controlling_tty():
if sys.platform == 'linux2': if sys.platform == 'linux2':
# On Linux, the controlling tty becomes the first tty opened by a # On Linux, the controlling tty becomes the first tty opened by a
# process lacking any prior tty. # process lacking any prior tty.
os.close(os.open(os.ttyname(0), os.O_RDWR)) os.close(os.open(os.ttyname(2), os.O_RDWR))
if sys.platform.startswith('freebsd') or sys.platform == 'darwin': if sys.platform.startswith('freebsd') or sys.platform == 'darwin':
# On BSD an explicit ioctl is required. # On BSD an explicit ioctl is required.
fcntl.ioctl(0, termios.TIOCSCTTY) fcntl.ioctl(2, termios.TIOCSCTTY)
def tty_create_child(*args): def tty_create_child(*args):
"""
Return a file descriptor connected to the master end of a pseudo-terminal,
whose slave end is connected to stdin/stdout/stderr of a new child process.
The child is created such that the pseudo-terminal becomes its controlling
TTY, ensuring access to /dev/tty returns a new file descriptor open on the
slave end.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, tty_fd, None)`
"""
master_fd, slave_fd = os.openpty() master_fd, slave_fd = os.openpty()
mitogen.core.set_block(slave_fd) mitogen.core.set_block(slave_fd)
disable_echo(master_fd) disable_echo(master_fd)
@ -295,7 +314,47 @@ def tty_create_child(*args):
os.close(slave_fd) os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, master_fd, os.getpid(), Argv(args)) proc.pid, master_fd, os.getpid(), Argv(args))
return proc.pid, master_fd return proc.pid, master_fd, None
def hybrid_tty_create_child(*args):
"""
Like :func:`tty_create_child`, except attach stdin/stdout to a socketpair
like :func:`create_child`, but leave stderr and the controlling TTY
attached to a TTY.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, socketpair_fd, tty_fd)`
"""
master_fd, slave_fd = os.openpty()
parentfp, childfp = create_socketpair()
mitogen.core.set_block(slave_fd)
mitogen.core.set_block(childfp)
disable_echo(master_fd)
disable_echo(slave_fd)
proc = subprocess.Popen(
args=args,
stdin=childfp,
stdout=childfp,
stderr=slave_fd,
preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
os.close(slave_fd)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
stdio_fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('hybrid_tty_create_child() pid=%d stdio=%d, tty=%d, cmd: %s',
proc.pid, stdio_fd, master_fd, Argv(args))
return proc.pid, stdio_fd, master_fd
def write_all(fd, s, deadline=None): def write_all(fd, s, deadline=None):
@ -319,7 +378,7 @@ def write_all(fd, s, deadline=None):
written += n written += n
def iter_read(fd, deadline=None): def iter_read(fds, deadline=None):
bits = [] bits = []
timeout = None timeout = None
@ -329,26 +388,27 @@ def iter_read(fd, deadline=None):
if timeout == 0: if timeout == 0:
break break
rfds, _, _ = select.select([fd], [], [], timeout) rfds, _, _ = select.select(fds, [], [], timeout)
if not rfds: if not rfds:
continue continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096) for fd in rfds:
IOLOG.debug('iter_read(%r) -> %r', fd, s) s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected or not s: IOLOG.debug('iter_read(%r) -> %r', fd, s)
raise mitogen.core.StreamError( if disconnected or not s:
'EOF on stream; last 300 bytes received: %r' % raise mitogen.core.StreamError(
(''.join(bits)[-300:],) 'EOF on stream; last 300 bytes received: %r' %
) (''.join(bits)[-300:],)
)
bits.append(s) bits.append(s)
yield s yield s
raise mitogen.core.TimeoutError('read timed out') raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline): def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline): for buf in iter_read([fd], deadline):
if IOLOG.level == logging.DEBUG: if IOLOG.level == logging.DEBUG:
for line in buf.splitlines(): for line in buf.splitlines():
IOLOG.debug('discard_until: discarding %r', line) IOLOG.debug('discard_until: discarding %r', line)
@ -414,6 +474,36 @@ def _proxy_connect(name, method_name, kwargs, econtext):
} }
class TtyLogStream(mitogen.core.BasicStream):
"""
For "hybrid TTY/socketpair" mode, after a connection has been setup, a
spare TTY file descriptor will exist that cannot be closed, and to which
SSH or sudo may continue writing log messages.
The descriptor cannot be closed since the UNIX TTY layer will send a
termination signal to any processes whose controlling TTY is the TTY that
has been closed.
TtyLogStream takes over this descriptor and creates corresponding log
messages for anything written to it.
"""
def __init__(self, tty_fd, stream):
self.receive_side = mitogen.core.Side(stream, tty_fd)
self.transmit_side = self.receive_side
self.stream = stream
def __repr__(self):
return 'mitogen.parent.TtyLogStream(%r)' % (self.stream,)
def on_receive(self, broker):
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
LOG.debug('%r.on_receive(): %r', self, buf)
class Stream(mitogen.core.Stream): class Stream(mitogen.core.Stream):
""" """
Base for streams capable of starting new slaves. Base for streams capable of starting new slaves.
@ -597,21 +687,21 @@ class Stream(mitogen.core.Stream):
def connect(self): def connect(self):
LOG.debug('%r.connect()', self) LOG.debug('%r.connect()', self)
self.pid, fd = self.start_child() self.pid, fd, extra_fd = self.start_child()
self.name = '%s.%s' % (self.name_prefix, self.pid) self.name = '%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd) self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd)) self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r', LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd) self, self.receive_side.fd)
self._connect_bootstrap() self._connect_bootstrap(extra_fd)
def _ec0_received(self): def _ec0_received(self):
LOG.debug('%r._ec0_received()', self) LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble()) write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0) discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
deadline = time.time() + self.connect_timeout deadline = time.time() + self.connect_timeout
discard_until(self.receive_side.fd, 'EC0\n', deadline) discard_until(self.receive_side.fd, 'EC0\n', deadline)
self._ec0_received() self._ec0_received()

@ -52,9 +52,13 @@ class PasswordError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
python_path = 'python2.7' python_path = 'python2.7'
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
tty_stream = None
#: The path to the SSH binary. #: The path to the SSH binary.
ssh_path = 'ssh' ssh_path = 'ssh'
@ -83,6 +87,10 @@ class Stream(mitogen.parent.Stream):
if ssh_args: if ssh_args:
self.ssh_args = ssh_args self.ssh_args = ssh_args
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.ssh_path]
# bits += ['-o', 'BatchMode yes'] # bits += ['-o', 'BatchMode yes']
@ -124,10 +132,12 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'SSH password is incorrect' password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified' password_required_msg = 'SSH password was requested, but none specified'
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fd=self.receive_side.fd, fds=[self.receive_side.fd, extra_fd],
deadline=self.connect_deadline deadline=self.connect_deadline
) )
@ -145,6 +155,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None: if self.password is None:
raise PasswordError(self.password_required_msg) raise PasswordError(self.password_required_msg)
LOG.debug('sending password') LOG.debug('sending password')
self.transmit_side.write(self.password + '\n') self.tty_stream.transmit_side.write(self.password + '\n')
password_sent = True password_sent = True
raise mitogen.core.StreamError('bootstrap failed') raise mitogen.core.StreamError('bootstrap failed')

@ -103,7 +103,12 @@ class PasswordError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
tty_stream = None
sudo_path = 'sudo' sudo_path = 'sudo'
username = 'root' username = 'root'
password = None password = None
@ -131,6 +136,10 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = 'sudo.' + self.username self.name = 'sudo.' + self.username
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July # Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, we always supply # 2013, so even though we parse long-format options, we always supply
@ -147,10 +156,12 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'sudo password is incorrect' password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required' password_required_msg = 'sudo password is required'
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fd=self.receive_side.fd, fds=[self.receive_side.fd, extra_fd],
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )
@ -165,6 +176,6 @@ class Stream(mitogen.parent.Stream):
if password_sent: if password_sent:
raise PasswordError(self.password_incorrect_msg) raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password') LOG.debug('sending password')
os.write(self.transmit_side.fd, self.password + '\n') self.tty_stream.transmit_side.write(self.password + '\n')
password_sent = True password_sent = True
raise mitogen.core.StreamError('bootstrap failed') raise mitogen.core.StreamError('bootstrap failed')

@ -13,7 +13,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
def test_direct_eof(self): def test_direct_eof(self):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local( lambda: self.router.local(
python_path='/bin/true', python_path='true',
connect_timeout=3, connect_timeout=3,
) )
) )
@ -25,7 +25,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local( lambda: self.router.local(
via=local, via=local,
python_path='/bin/true', python_path='true',
connect_timeout=3, connect_timeout=3,
) )
) )
@ -77,11 +77,11 @@ class TtyCreateChildTest(unittest2.TestCase):
# read a password. # read a password.
tf = tempfile.NamedTemporaryFile() tf = tempfile.NamedTemporaryFile()
try: try:
pid, fd = self.func( pid, fd, _ = self.func(
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
) )
deadline = time.time() + 5.0 deadline = time.time() + 5.0
for line in mitogen.parent.iter_read(fd, deadline): for line in mitogen.parent.iter_read([fd], deadline):
self.assertEquals('hi\n', line) self.assertEquals('hi\n', line)
break break
waited_pid, status = os.waitpid(pid, 0) waited_pid, status = os.waitpid(pid, 0)
@ -104,7 +104,7 @@ class IterReadTest(unittest2.TestCase):
def test_no_deadline(self): def test_no_deadline(self):
proc = self.make_proc() proc = self.make_proc()
try: try:
reader = self.func(proc.stdout.fileno()) reader = self.func([proc.stdout.fileno()])
for i, chunk in enumerate(reader, 1): for i, chunk in enumerate(reader, 1):
self.assertEqual(i, int(chunk)) self.assertEqual(i, int(chunk))
if i > 3: if i > 3:
@ -114,7 +114,7 @@ class IterReadTest(unittest2.TestCase):
def test_deadline_exceeded_before_call(self): def test_deadline_exceeded_before_call(self):
proc = self.make_proc() proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), 0) reader = self.func([proc.stdout.fileno()], 0)
try: try:
got = [] got = []
try: try:
@ -128,7 +128,7 @@ class IterReadTest(unittest2.TestCase):
def test_deadline_exceeded_during_call(self): def test_deadline_exceeded_during_call(self):
proc = self.make_proc() proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), time.time() + 0.4) reader = self.func([proc.stdout.fileno()], time.time() + 0.4)
try: try:
got = [] got = []
try: try:

Loading…
Cancel
Save