Merge pull request #194 from dw/dmw

Implement hybrid TTY/socket mode
pull/205/head
dw 8 years ago committed by GitHub
commit 79fb65c5d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -51,7 +51,7 @@ echo \
# Build the binaries. # Build the binaries.
make -C ${TRAVIS_BUILD_DIR}/tests/ansible make -C ${TRAVIS_BUILD_DIR}/tests/ansible
sudo apt install -y sshpass [ ! "$(type -p sshpass)" ] && sudo apt install -y sshpass
echo travis_fold:end:job_setup echo travis_fold:end:job_setup

@ -402,27 +402,12 @@ Helper Functions
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. function:: create_child (\*args) .. autofunction:: create_child
Create a child process whose stdin/stdout is connected to a socket,
returning `(pid, socket_obj)`.
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. function:: tty_create_child (\*args) .. autofunction:: tty_create_child
Return a file descriptor connected to the master end of a pseudo-terminal,
whose slave end is connected to stdin/stdout/stderr of a new child process.
The child is created such that the pseudo-terminal becomes its controlling
TTY, ensuring access to /dev/tty returns a new file descriptor open on the
slave end.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, fd)`
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master

@ -108,7 +108,7 @@ class Stream(mitogen.parent.Stream):
# Decouple the socket from the lifetime of the Python socket object. # Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno()) fd = os.dup(parentfp.fileno())
parentfp.close() parentfp.close()
return self.pid, fd return self.pid, fd, None
else: else:
parentfp.close() parentfp.close()
self._wrap_child_main(childfp) self._wrap_child_main(childfp)
@ -155,6 +155,6 @@ class Stream(mitogen.parent.Stream):
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
# None required. # None required.
pass pass

@ -243,6 +243,12 @@ def create_socketpair():
def create_child(*args): def create_child(*args):
"""
Create a child process whose stdin/stdout is connected to a socket.
:returns:
`(pid, socket_obj, :data:`None`)`
"""
parentfp, childfp = create_socketpair() parentfp, childfp = create_socketpair()
# When running under a monkey patches-enabled gevent, the socket module # When running under a monkey patches-enabled gevent, the socket module
# yields file descriptors who already have O_NONBLOCK, which is # yields file descriptors who already have O_NONBLOCK, which is
@ -263,7 +269,7 @@ def create_child(*args):
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s', LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, fd, os.getpid(), Argv(args)) proc.pid, fd, os.getpid(), Argv(args))
return proc.pid, fd return proc.pid, fd, None
def _acquire_controlling_tty(): def _acquire_controlling_tty():
@ -271,13 +277,26 @@ def _acquire_controlling_tty():
if sys.platform == 'linux2': if sys.platform == 'linux2':
# On Linux, the controlling tty becomes the first tty opened by a # On Linux, the controlling tty becomes the first tty opened by a
# process lacking any prior tty. # process lacking any prior tty.
os.close(os.open(os.ttyname(0), os.O_RDWR)) os.close(os.open(os.ttyname(2), os.O_RDWR))
if sys.platform.startswith('freebsd') or sys.platform == 'darwin': if sys.platform.startswith('freebsd') or sys.platform == 'darwin':
# On BSD an explicit ioctl is required. # On BSD an explicit ioctl is required.
fcntl.ioctl(0, termios.TIOCSCTTY) fcntl.ioctl(2, termios.TIOCSCTTY)
def tty_create_child(*args): def tty_create_child(*args):
"""
Return a file descriptor connected to the master end of a pseudo-terminal,
whose slave end is connected to stdin/stdout/stderr of a new child process.
The child is created such that the pseudo-terminal becomes its controlling
TTY, ensuring access to /dev/tty returns a new file descriptor open on the
slave end.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, tty_fd, None)`
"""
master_fd, slave_fd = os.openpty() master_fd, slave_fd = os.openpty()
mitogen.core.set_block(slave_fd) mitogen.core.set_block(slave_fd)
disable_echo(master_fd) disable_echo(master_fd)
@ -295,7 +314,47 @@ def tty_create_child(*args):
os.close(slave_fd) os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, master_fd, os.getpid(), Argv(args)) proc.pid, master_fd, os.getpid(), Argv(args))
return proc.pid, master_fd return proc.pid, master_fd, None
def hybrid_tty_create_child(*args):
"""
Like :func:`tty_create_child`, except attach stdin/stdout to a socketpair
like :func:`create_child`, but leave stderr and the controlling TTY
attached to a TTY.
:param list args:
:py:func:`os.execl` argument list.
:returns:
`(pid, socketpair_fd, tty_fd)`
"""
master_fd, slave_fd = os.openpty()
parentfp, childfp = create_socketpair()
mitogen.core.set_block(slave_fd)
mitogen.core.set_block(childfp)
disable_echo(master_fd)
disable_echo(slave_fd)
proc = subprocess.Popen(
args=args,
stdin=childfp,
stdout=childfp,
stderr=slave_fd,
preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
os.close(slave_fd)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
stdio_fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('hybrid_tty_create_child() pid=%d stdio=%d, tty=%d, cmd: %s',
proc.pid, stdio_fd, master_fd, Argv(args))
return proc.pid, stdio_fd, master_fd
def write_all(fd, s, deadline=None): def write_all(fd, s, deadline=None):
@ -319,36 +378,42 @@ def write_all(fd, s, deadline=None):
written += n written += n
def iter_read(fd, deadline=None): def iter_read(fds, deadline=None):
fds = list(fds)
bits = [] bits = []
timeout = None timeout = None
while True: while fds:
if deadline is not None: if deadline is not None:
timeout = max(0, deadline - time.time()) timeout = max(0, deadline - time.time())
if timeout == 0: if timeout == 0:
break break
rfds, _, _ = select.select([fd], [], [], timeout) rfds, _, _ = select.select(fds, [], [], timeout)
if not rfds: if not rfds:
continue continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096) for fd in rfds:
IOLOG.debug('iter_read(%r) -> %r', fd, s) s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected or not s: if disconnected or not s:
raise mitogen.core.StreamError( IOLOG.debug('iter_read(%r) -> disconnected', fd)
'EOF on stream; last 300 bytes received: %r' % fds.remove(fd)
(''.join(bits)[-300:],) else:
) IOLOG.debug('iter_read(%r) -> %r', fd, s)
bits.append(s)
bits.append(s) yield s
yield s
if not fds:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
)
raise mitogen.core.TimeoutError('read timed out') raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline): def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline): for buf in iter_read([fd], deadline):
if IOLOG.level == logging.DEBUG: if IOLOG.level == logging.DEBUG:
for line in buf.splitlines(): for line in buf.splitlines():
IOLOG.debug('discard_until: discarding %r', line) IOLOG.debug('discard_until: discarding %r', line)
@ -414,6 +479,36 @@ def _proxy_connect(name, method_name, kwargs, econtext):
} }
class TtyLogStream(mitogen.core.BasicStream):
"""
For "hybrid TTY/socketpair" mode, after a connection has been setup, a
spare TTY file descriptor will exist that cannot be closed, and to which
SSH or sudo may continue writing log messages.
The descriptor cannot be closed since the UNIX TTY layer will send a
termination signal to any processes whose controlling TTY is the TTY that
has been closed.
TtyLogStream takes over this descriptor and creates corresponding log
messages for anything written to it.
"""
def __init__(self, tty_fd, stream):
self.receive_side = mitogen.core.Side(stream, tty_fd)
self.transmit_side = self.receive_side
self.stream = stream
def __repr__(self):
return 'mitogen.parent.TtyLogStream(%r)' % (self.stream,)
def on_receive(self, broker):
buf = self.receive_side.read()
if not buf:
return self.on_disconnect(broker)
LOG.debug('%r.on_receive(): %r', self, buf)
class Stream(mitogen.core.Stream): class Stream(mitogen.core.Stream):
""" """
Base for streams capable of starting new slaves. Base for streams capable of starting new slaves.
@ -597,21 +692,21 @@ class Stream(mitogen.core.Stream):
def connect(self): def connect(self):
LOG.debug('%r.connect()', self) LOG.debug('%r.connect()', self)
self.pid, fd = self.start_child() self.pid, fd, extra_fd = self.start_child()
self.name = '%s.%s' % (self.name_prefix, self.pid) self.name = '%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd) self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd)) self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r', LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd) self, self.receive_side.fd)
self._connect_bootstrap() self._connect_bootstrap(extra_fd)
def _ec0_received(self): def _ec0_received(self):
LOG.debug('%r._ec0_received()', self) LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble()) write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0) discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
deadline = time.time() + self.connect_timeout deadline = time.time() + self.connect_timeout
discard_until(self.receive_side.fd, 'EC0\n', deadline) discard_until(self.receive_side.fd, 'EC0\n', deadline)
self._ec0_received() self._ec0_received()

@ -52,9 +52,13 @@ class PasswordError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
python_path = 'python2.7' python_path = 'python2.7'
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
tty_stream = None
#: The path to the SSH binary. #: The path to the SSH binary.
ssh_path = 'ssh' ssh_path = 'ssh'
@ -83,6 +87,10 @@ class Stream(mitogen.parent.Stream):
if ssh_args: if ssh_args:
self.ssh_args = ssh_args self.ssh_args = ssh_args
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.ssh_path]
# bits += ['-o', 'BatchMode yes'] # bits += ['-o', 'BatchMode yes']
@ -124,10 +132,12 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'SSH password is incorrect' password_incorrect_msg = 'SSH password is incorrect'
password_required_msg = 'SSH password was requested, but none specified' password_required_msg = 'SSH password was requested, but none specified'
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fd=self.receive_side.fd, fds=[self.receive_side.fd, extra_fd],
deadline=self.connect_deadline deadline=self.connect_deadline
) )
@ -145,6 +155,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None: if self.password is None:
raise PasswordError(self.password_required_msg) raise PasswordError(self.password_required_msg)
LOG.debug('sending password') LOG.debug('sending password')
self.transmit_side.write(self.password + '\n') self.tty_stream.transmit_side.write(self.password + '\n')
password_sent = True password_sent = True
raise mitogen.core.StreamError('bootstrap failed') raise mitogen.core.StreamError('bootstrap failed')

@ -103,7 +103,12 @@ class PasswordError(mitogen.core.StreamError):
class Stream(mitogen.parent.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
#: Once connected, points to the corresponding TtyLogStream, allowing it to
#: be disconnected at the same time this stream is being torn down.
tty_stream = None
sudo_path = 'sudo' sudo_path = 'sudo'
username = 'root' username = 'root'
password = None password = None
@ -131,6 +136,10 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = 'sudo.' + self.username self.name = 'sudo.' + self.username
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July # Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, we always supply # 2013, so even though we parse long-format options, we always supply
@ -147,10 +156,12 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'sudo password is incorrect' password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required' password_required_msg = 'sudo password is required'
def _connect_bootstrap(self): def _connect_bootstrap(self, extra_fd):
self.tty_stream = mitogen.parent.TtyLogStream(extra_fd, self)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fd=self.receive_side.fd, fds=[self.receive_side.fd, extra_fd],
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )
@ -165,6 +176,6 @@ class Stream(mitogen.parent.Stream):
if password_sent: if password_sent:
raise PasswordError(self.password_incorrect_msg) raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password') LOG.debug('sending password')
os.write(self.transmit_side.fd, self.password + '\n') self.tty_stream.transmit_side.write(self.password + '\n')
password_sent = True password_sent = True
raise mitogen.core.StreamError('bootstrap failed') raise mitogen.core.StreamError('bootstrap failed')

@ -3,6 +3,7 @@ inventory = hosts
gathering = explicit gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy strategy_plugins = ../../ansible_mitogen/plugins/strategy
action_plugins = lib/action action_plugins = lib/action
callback_plugins = lib/callback
library = lib/modules library = lib/modules
retry_files_enabled = False retry_files_enabled = False
forks = 50 forks = 50

@ -0,0 +1,81 @@
# profile_tasks.py: an Ansible plugin for timing tasks
# Copyright (C) 2014 Jharrod LaFon <jharrod.lafon@gmail.com>
# https://github.com/jlafon/ansible-profile/
# Included with permission
# The MIT License (MIT)
#
# Copyright (c) 2014 Jharrod LaFon
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from ansible.plugins.callback import CallbackBase
import time
class CallbackModule(CallbackBase):
"""
A plugin for timing tasks
"""
def __init__(self):
self.stats = {}
self.current = None
def playbook_on_task_start(self, name, is_conditional):
"""
Logs the start of each task
"""
if self.current is not None:
# Record the running time of the last executed task
self.stats[self.current] = time.time() - self.stats[self.current]
# Record the start time of the current task
self.current = name
self.stats[self.current] = time.time()
called = False
def playbook_on_stats(self, stats):
"""
Prints the timings
"""
if CallbackModule.called:
return
CallbackModule.called = True
# Record the timing of the very last task
if self.current is not None:
self.stats[self.current] = time.time() - self.stats[self.current]
# Sort the tasks by their running time
results = sorted(self.stats.items(),
key=lambda value: value[1], reverse=True)
# Just keep the top 10
results = results[:10]
# Print the timings
for name, elapsed in results:
print("{0:-<70}{1:->9}".format(
'{0} '.format(name),
' {0:.02f}s'.format(elapsed)))

@ -13,7 +13,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
def test_direct_eof(self): def test_direct_eof(self):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local( lambda: self.router.local(
python_path='/bin/true', python_path='true',
connect_timeout=3, connect_timeout=3,
) )
) )
@ -25,7 +25,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
e = self.assertRaises(mitogen.core.StreamError, e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local( lambda: self.router.local(
via=local, via=local,
python_path='/bin/true', python_path='true',
connect_timeout=3, connect_timeout=3,
) )
) )
@ -77,11 +77,11 @@ class TtyCreateChildTest(unittest2.TestCase):
# read a password. # read a password.
tf = tempfile.NamedTemporaryFile() tf = tempfile.NamedTemporaryFile()
try: try:
pid, fd = self.func( pid, fd, _ = self.func(
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
) )
deadline = time.time() + 5.0 deadline = time.time() + 5.0
for line in mitogen.parent.iter_read(fd, deadline): for line in mitogen.parent.iter_read([fd], deadline):
self.assertEquals('hi\n', line) self.assertEquals('hi\n', line)
break break
waited_pid, status = os.waitpid(pid, 0) waited_pid, status = os.waitpid(pid, 0)
@ -104,7 +104,7 @@ class IterReadTest(unittest2.TestCase):
def test_no_deadline(self): def test_no_deadline(self):
proc = self.make_proc() proc = self.make_proc()
try: try:
reader = self.func(proc.stdout.fileno()) reader = self.func([proc.stdout.fileno()])
for i, chunk in enumerate(reader, 1): for i, chunk in enumerate(reader, 1):
self.assertEqual(i, int(chunk)) self.assertEqual(i, int(chunk))
if i > 3: if i > 3:
@ -114,7 +114,7 @@ class IterReadTest(unittest2.TestCase):
def test_deadline_exceeded_before_call(self): def test_deadline_exceeded_before_call(self):
proc = self.make_proc() proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), 0) reader = self.func([proc.stdout.fileno()], 0)
try: try:
got = [] got = []
try: try:
@ -128,7 +128,7 @@ class IterReadTest(unittest2.TestCase):
def test_deadline_exceeded_during_call(self): def test_deadline_exceeded_during_call(self):
proc = self.make_proc() proc = self.make_proc()
reader = self.func(proc.stdout.fileno(), time.time() + 0.4) reader = self.func([proc.stdout.fileno()], time.time() + 0.4)
try: try:
got = [] got = []
try: try:

Loading…
Cancel
Save