decelerate! (#30160)

removed accelerate code
removed keyczar dep for accelerate
pull/30465/head
Brian Coca 7 years ago committed by GitHub
parent 1921eaf096
commit ae29245e05

@ -1,101 +1,6 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
---
ACCELERATE_CONNECT_TIMEOUT:
default: 1.0
description:
- "This setting controls the timeout for the socket connect call, and should be kept relatively low.
The connection to the accelerate_port will be attempted 3 times before Ansible will fall back to ssh or paramiko
(depending on your default connection setting) to try and start the accelerate daemon remotely."
- "Note, this value can be set to less than one second, however it is probably not a good idea to do so
unless you are on a very fast and reliable LAN. If you are connecting to systems over the internet, it may be necessary to increase this timeout."
env: [{name: ACCELERATE_CONNECT_TIMEOUT }]
ini:
- {key: accelerate_connect_timeout, section: accelerate}
type: float
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
version_added: "1.4"
ACCELERATE_DAEMON_TIMEOUT:
default: 30
description:
- This setting controls the timeout for the accelerated daemon, as measured in minutes. The default daemon timeout is 30 minutes.
- "Prior to 1.6, the timeout was hard-coded from the time of the daemon's launch."
- For version 1.6+, the timeout is now based on the last activity to the daemon and is configurable via this option.
env: [{name: ACCELERATE_DAEMON_TIMEOUT}]
ini:
- {key: accelerate_daemon_timeout, section: accelerate}
type: integer
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
version_added: "1.6"
ACCELERATE_KEYS_DIR:
default: ~/.fireball.keys
description: ''
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
env: [{name: ACCELERATE_KEYS_DIR}]
ini:
- {key: accelerate_keys_dir, section: accelerate}
ACCELERATE_KEYS_DIR_PERMS:
default: '700'
description: 'TODO: write it'
env: [{name: ACCELERATE_KEYS_DIR_PERMS}]
ini:
- {key: accelerate_keys_dir_perms, section: accelerate}
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
ACCELERATE_KEYS_FILE_PERMS:
default: '600'
description: 'TODO: write it'
env: [{name: ACCELERATE_KEYS_FILE_PERMS}]
ini:
- {key: accelerate_keys_file_perms, section: accelerate}
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
ACCELERATE_MULTI_KEY:
default: False
description: 'TODO: write it'
env: [{name: ACCELERATE_MULTI_KEY}]
ini:
- {key: accelerate_multi_key, section: accelerate}
type: boolean
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
ACCELERATE_PORT:
default: 5099
description: 'TODO: write it'
env: [{name: ACCELERATE_PORT}]
ini:
- {key: accelerate_port, section: accelerate}
type: integer
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
ACCELERATE_TIMEOUT:
default: 30
description: 'TODO: write it'
env: [{name: ACCELERATE_TIMEOUT}]
ini:
- {key: accelerate_timeout, section: accelerate}
type: integer
deprecated:
why: Removing accelerate as a connection method, settings not needed either.
version: "2.5"
alternatives: ssh and paramiko
ALLOW_WORLD_READABLE_TMPFILES:
name: Allow world readable temporary files
default: False

@ -19,7 +19,6 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import base64
import time
import traceback
@ -32,7 +31,6 @@ from ansible.playbook.conditional import Conditional
from ansible.playbook.task import Task
from ansible.plugins.connection import ConnectionBase
from ansible.template import Templar
from ansible.utils.encrypt import key_for_hostname
from ansible.utils.listify import listify_lookup_plugin_terms
from ansible.utils.unsafe_proxy import UnsafeProxy, wrap_var
@ -738,42 +736,6 @@ class TaskExecutor:
self._play_context.set_options_from_plugin(connection)
if self._play_context.accelerate:
# accelerate is deprecated as of 2.1...
display.deprecated('Accelerated mode is deprecated. Consider using SSH with ControlPersist and pipelining enabled instead', version='2.6')
# launch the accelerated daemon here
ssh_connection = connection
handler = self._shared_loader_obj.action_loader.get(
'normal',
task=self._task,
connection=ssh_connection,
play_context=self._play_context,
loader=self._loader,
templar=templar,
shared_loader_obj=self._shared_loader_obj,
)
key = key_for_hostname(self._play_context.remote_addr)
accelerate_args = dict(
password=base64.b64encode(key.__str__()),
port=self._play_context.accelerate_port,
minutes=C.ACCELERATE_DAEMON_TIMEOUT,
ipv6=self._play_context.accelerate_ipv6,
debug=self._play_context.verbosity,
)
connection = self._shared_loader_obj.connection_loader.get('accelerate', self._play_context, self._new_stdin)
if not connection:
raise AnsibleError("the connection plugin '%s' was not found" % conn_type)
try:
connection._connect()
except AnsibleConnectionFailure:
display.debug('connection failed, fallback to accelerate')
res = handler._execute_module(module_name='accelerate', module_args=accelerate_args, task_vars=variables, delete_remote_tmp=False)
display.debug(res)
connection._connect()
return connection
def _get_action_handler(self, connection, templar):

@ -16,9 +16,11 @@ ANSIBLE_METADATA = {'metadata_version': '1.1',
DOCUMENTATION = '''
---
module: accelerate
removed: True
short_description: Enable accelerated mode on remote node
deprecated: "Use SSH with ControlPersist instead."
description:
- This module has been removed, this file is kept for historicaly documentation purposes
- This modules launches an ephemeral I(accelerate) daemon on the remote node which
Ansible can use to communicate with nodes at high speed.
- The daemon listens on a configurable port for a configurable amount of time.
@ -75,673 +77,3 @@ EXAMPLES = '''
tasks:
- command: /usr/bin/anything
'''
import base64
import errno
import getpass
import json
import os
import os.path
import pwd
import signal
import socket
import struct
import sys
import syslog
import tempfile
import time
import traceback
import datetime
from threading import Thread, Lock
# import module snippets
# we must import this here at the top so we can use get_module_path()
from ansible.module_utils.basic import AnsibleModule, get_module_path
from ansible.module_utils.six.moves import socketserver
# the chunk size to read and send, assuming mtu 1500 and
# leaving room for base64 (+33%) encoding and header (100 bytes)
# 4 * (975/3) + 100 = 1400
# which leaves room for the TCP/IP header
CHUNK_SIZE = 10240
# FIXME: this all should be moved to module_common, as it's
# pretty much a copy from the callbacks/util code
DEBUG_LEVEL = 0
def log(msg, cap=0):
global DEBUG_LEVEL
if DEBUG_LEVEL >= cap:
syslog.syslog(syslog.LOG_NOTICE | syslog.LOG_DAEMON, msg)
def v(msg):
log(msg, cap=1)
def vv(msg):
log(msg, cap=2)
def vvv(msg):
log(msg, cap=3)
def vvvv(msg):
log(msg, cap=4)
HAS_KEYCZAR = False
try:
from keyczar.keys import AesKey
HAS_KEYCZAR = True
except ImportError:
pass
SOCKET_FILE = os.path.join(get_module_path(), '.ansible-accelerate', ".local.socket")
def get_pid_location(module):
"""
Try to find a pid directory in the common locations, falling
back to the user's home directory if no others exist
"""
for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]:
try:
if os.path.isdir(dir) and os.access(dir, os.R_OK | os.W_OK):
return os.path.join(dir, '.accelerate.pid')
except:
pass
module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file")
# NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move
# this into utils.module_common and probably should anyway
def daemonize_self(module, password, port, minutes, pid_file):
# daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
try:
pid = os.fork()
if pid > 0:
vvv("exiting pid %s" % pid)
# exit first parent
module.exit_json(msg="daemonized accelerate on port %s for %s minutes with pid %s" % (port, minutes, str(pid)))
except OSError as e:
message = "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
module.fail_json(msg=message)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(int('O22', 8))
# do second fork
try:
pid = os.fork()
if pid > 0:
log("daemon pid %s, writing %s" % (pid, pid_file))
pid_file = open(pid_file, "w")
pid_file.write("%s" % pid)
pid_file.close()
vvv("pid file written")
sys.exit(0)
except OSError as e:
log('fork #2 failed: %d (%s)' % (e.errno, e.strerror))
sys.exit(1)
dev_null = open('/dev/null', 'rw')
os.dup2(dev_null.fileno(), sys.stdin.fileno())
os.dup2(dev_null.fileno(), sys.stdout.fileno())
os.dup2(dev_null.fileno(), sys.stderr.fileno())
log("daemonizing successful")
class LocalSocketThread(Thread):
server = None
terminated = False
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, Verbose=None):
kwargs = {} if kwargs is None else kwargs
self.server = kwargs.get('server')
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
def run(self):
try:
if os.path.exists(SOCKET_FILE):
os.remove(SOCKET_FILE)
else:
dir = os.path.dirname(SOCKET_FILE)
if os.path.exists(dir):
if not os.path.isdir(dir):
log("The socket file path (%s) exists, but is not a directory. No local connections will be available" % dir)
return
else:
# make sure the directory is accessible only to this
# user, as socket files derive their permissions from
# the directory that contains them
os.chmod(dir, int('0700', 8))
elif not os.path.exists(dir):
os.makedirs(dir, int('O700', 8))
except OSError:
pass
self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.s.bind(SOCKET_FILE)
self.s.listen(5)
while not self.terminated:
try:
conn, addr = self.s.accept()
vv("received local connection")
data = ""
while "\n" not in data:
data += conn.recv(2048)
try:
try:
new_key = AesKey.Read(data.strip())
found = False
for key in self.server.key_list:
try:
new_key.Decrypt(key.Encrypt("foo"))
found = True
break
except:
pass
if not found:
vv("adding new key to the key list")
self.server.key_list.append(new_key)
conn.sendall("OK\n")
else:
vv("key already exists in the key list, ignoring")
conn.sendall("EXISTS\n")
# update the last event time so the server doesn't
# shutdown sooner than expected for new clients
try:
self.server.last_event_lock.acquire()
self.server.last_event = datetime.datetime.now()
finally:
self.server.last_event_lock.release()
except Exception as e:
vv("key loaded locally was invalid, ignoring (%s)" % e)
conn.sendall("BADKEY\n")
finally:
try:
conn.close()
except:
pass
except:
pass
def terminate(self):
super(LocalSocketThread, self).terminate()
self.terminated = True
self.s.shutdown(socket.SHUT_RDWR)
self.s.close()
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, Verbose=None):
kwargs = {} if kwargs is None else kwargs
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self, timeout=None):
Thread.join(self, timeout=timeout)
return self._return
class ThreadedTCPServer(socketserver.ThreadingTCPServer):
key_list = []
last_event = datetime.datetime.now()
last_event_lock = Lock()
def __init__(self, server_address, RequestHandlerClass, module, password, timeout, use_ipv6=False):
self.module = module
self.key_list.append(AesKey.Read(password))
self.allow_reuse_address = True
self.timeout = timeout
if use_ipv6:
self.address_family = socket.AF_INET6
if self.module.params.get('multi_key', False):
vv("starting thread to handle local connections for multiple keys")
self.local_thread = LocalSocketThread(kwargs=dict(server=self))
self.local_thread.start()
socketserver.ThreadingTCPServer.__init__(self, server_address, RequestHandlerClass)
def shutdown(self):
self.running = False
socketserver.ThreadingTCPServer.shutdown(self)
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
# the key to use for this connection
active_key = None
def send_data(self, data):
try:
self.server.last_event_lock.acquire()
self.server.last_event = datetime.datetime.now()
finally:
self.server.last_event_lock.release()
packed_len = struct.pack('!Q', len(data))
return self.request.sendall(packed_len + data)
def recv_data(self):
header_len = 8 # size of a packed unsigned long long
data = ""
vvvv("in recv_data(), waiting for the header")
while len(data) < header_len:
try:
d = self.request.recv(header_len - len(data))
if not d:
vvv("received nothing, bailing out")
return None
data += d
except:
# probably got a connection reset
vvvv("exception received while waiting for recv(), returning None")
return None
vvvv("in recv_data(), got the header, unpacking")
data_len = struct.unpack('!Q', data[:header_len])[0]
data = data[header_len:]
vvvv("data received so far (expecting %d): %d" % (data_len, len(data)))
while len(data) < data_len:
try:
d = self.request.recv(data_len - len(data))
if not d:
vvv("received nothing, bailing out")
return None
data += d
vvvv("data received so far (expecting %d): %d" % (data_len, len(data)))
except:
# probably got a connection reset
vvvv("exception received while waiting for recv(), returning None")
return None
vvvv("received all of the data, returning")
try:
self.server.last_event_lock.acquire()
self.server.last_event = datetime.datetime.now()
finally:
self.server.last_event_lock.release()
return data
def handle(self):
try:
while True:
vvvv("waiting for data")
data = self.recv_data()
if not data:
vvvv("received nothing back from recv_data(), breaking out")
break
vvvv("got data, decrypting")
if not self.active_key:
for key in self.server.key_list:
try:
data = key.Decrypt(data)
self.active_key = key
break
except:
pass
else:
vv("bad decrypt, exiting the connection handler")
return
else:
try:
data = self.active_key.Decrypt(data)
except:
vv("bad decrypt, exiting the connection handler")
return
vvvv("decryption done, loading json from the data")
data = json.loads(data)
mode = data['mode']
response = {}
last_pong = datetime.datetime.now()
if mode == 'command':
vvvv("received a command request, running it")
twrv = ThreadWithReturnValue(target=self.command, args=(data,))
twrv.start()
response = None
while twrv.is_alive():
if (datetime.datetime.now() - last_pong).seconds >= 15:
last_pong = datetime.datetime.now()
vvvv("command still running, sending keepalive packet")
data2 = json.dumps(dict(pong=True))
data2 = self.active_key.Encrypt(data2)
self.send_data(data2)
time.sleep(0.1)
response = twrv._return
vvvv("thread is done, response from join was %s" % response)
elif mode == 'put':
vvvv("received a put request, putting it")
response = self.put(data)
elif mode == 'fetch':
vvvv("received a fetch request, getting it")
response = self.fetch(data)
elif mode == 'validate_user':
vvvv("received a request to validate the user id")
response = self.validate_user(data)
vvvv("response result is %s" % str(response))
json_response = json.dumps(response)
vvvv("dumped json is %s" % json_response)
data2 = self.active_key.Encrypt(json_response)
vvvv("sending the response back to the controller")
self.send_data(data2)
vvvv("done sending the response")
if mode == 'validate_user' and response.get('rc') == 1:
vvvv("detected a uid mismatch, shutting down")
self.server.shutdown()
except:
tb = traceback.format_exc()
log("encountered an unhandled exception in the handle() function")
log("error was:\n%s" % tb)
if self.active_key:
data2 = json.dumps(dict(rc=1, failed=True, msg="unhandled error in the handle() function"))
data2 = self.active_key.Encrypt(data2)
self.send_data(data2)
def validate_user(self, data):
if 'username' not in data:
return dict(failed=True, msg='No username specified')
vvvv("validating we're running as %s" % data['username'])
# get the current uid
c_uid = os.getuid()
try:
# the target uid
t_uid = pwd.getpwnam(data['username']).pw_uid
except:
vvvv("could not find user %s" % data['username'])
return dict(failed=True, msg='could not find user %s' % data['username'])
# and return rc=0 for success, rc=1 for failure
if c_uid == t_uid:
return dict(rc=0)
else:
return dict(rc=1)
def command(self, data):
if 'cmd' not in data:
return dict(failed=True, msg='internal error: cmd is required')
vvvv("executing: %s" % data['cmd'])
use_unsafe_shell = False
executable = data.get('executable')
if executable:
use_unsafe_shell = True
rc, stdout, stderr = self.server.module.run_command(data['cmd'], executable=executable, use_unsafe_shell=use_unsafe_shell, close_fds=True)
if stdout is None:
stdout = ''
if stderr is None:
stderr = ''
vvvv("got stdout: %s" % stdout)
vvvv("got stderr: %s" % stderr)
return dict(rc=rc, stdout=stdout, stderr=stderr)
def fetch(self, data):
if 'in_path' not in data:
return dict(failed=True, msg='internal error: in_path is required')
try:
fd = open(data['in_path'], 'rb')
fstat = os.stat(data['in_path'])
vvv("FETCH file is %d bytes" % fstat.st_size)
while fd.tell() < fstat.st_size:
data = fd.read(CHUNK_SIZE)
last = False
if fd.tell() >= fstat.st_size:
last = True
data = dict(data=base64.b64encode(data), last=last)
data = json.dumps(data)
data = self.active_key.Encrypt(data)
if self.send_data(data):
return dict(failed=True, stderr="failed to send data")
response = self.recv_data()
if not response:
log("failed to get a response, aborting")
return dict(failed=True, stderr="Failed to get a response from %s" % self.host)
response = self.active_key.Decrypt(response)
response = json.loads(response)
if response.get('failed', False):
log("got a failed response from the master")
return dict(failed=True, stderr="Master reported failure, aborting transfer")
except Exception as e:
fd.close()
tb = traceback.format_exc()
log("failed to fetch the file: %s" % tb)
return dict(failed=True, stderr="Could not fetch the file: %s" % e)
fd.close()
return dict()
def put(self, data):
if 'data' not in data:
return dict(failed=True, msg='internal error: data is required')
if 'out_path' not in data:
return dict(failed=True, msg='internal error: out_path is required')
final_path = None
if 'user' in data and data.get('user') != getpass.getuser():
vvv("the target user doesn't match this user, we'll move the file into place via sudo")
tmp_path = os.path.expanduser('~/.ansible/tmp/')
if not os.path.exists(tmp_path):
try:
os.makedirs(tmp_path, int('O700', 8))
except:
return dict(failed=True, msg='could not create a temporary directory at %s' % tmp_path)
(fd, out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path)
out_fd = os.fdopen(fd, 'w', 0)
final_path = data['out_path']
else:
out_path = data['out_path']
out_fd = open(out_path, 'w')
try:
bytes = 0
while True:
out = base64.b64decode(data['data'])
bytes += len(out)
out_fd.write(out)
response = json.dumps(dict())
response = self.active_key.Encrypt(response)
self.send_data(response)
if data['last']:
break
data = self.recv_data()
if not data:
raise ""
data = self.active_key.Decrypt(data)
data = json.loads(data)
except:
out_fd.close()
tb = traceback.format_exc()
log("failed to put the file: %s" % tb)
return dict(failed=True, stdout="Could not write the file")
vvvv("wrote %d bytes" % bytes)
out_fd.close()
if final_path:
vvv("moving %s to %s" % (out_path, final_path))
self.server.module.atomic_move(out_path, final_path)
return dict()
def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file):
try:
daemonize_self(module, password, port, minutes, pid_file)
def timer_handler(signum, _):
try:
try:
server.last_event_lock.acquire()
td = datetime.datetime.now() - server.last_event
# older python timedelta objects don't have total_seconds(),
# so we use the formula from the docs to calculate it
total_seconds = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
if total_seconds >= minutes * 60:
log("server has been idle longer than the timeout, shutting down")
server.running = False
server.shutdown()
else:
# reschedule the check
signal.alarm(1)
except:
pass
finally:
server.last_event_lock.release()
signal.signal(signal.SIGALRM, timer_handler)
signal.alarm(1)
tries = 5
while tries > 0:
try:
if use_ipv6:
address = ("::", port)
else:
address = ("0.0.0.0", port)
server = ThreadedTCPServer(address, ThreadedTCPRequestHandler, module, password, timeout, use_ipv6=use_ipv6)
server.allow_reuse_address = True
break
except Exception as e:
vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries, e))
tries -= 1
time.sleep(0.2)
if tries == 0:
vv("Maximum number of attempts to create the TCP server reached, bailing out")
raise Exception("max # of attempts to serve reached")
# run the server in a separate thread to make signal handling work
server_thread = Thread(target=server.serve_forever, kwargs=dict(poll_interval=0.1))
server_thread.start()
server.running = True
v("serving!")
while server.running:
time.sleep(1)
# wait for the thread to exit fully
server_thread.join()
v("server thread terminated, exiting!")
sys.exit(0)
except Exception as e:
tb = traceback.format_exc()
log("exception caught, exiting accelerated mode: %s\n%s" % (e, tb))
sys.exit(0)
def main():
global DEBUG_LEVEL
module = AnsibleModule(
argument_spec=dict(
port=dict(type='int', default=5099),
ipv6=dict(type='bool', default=False),
multi_key=dict(type='bool', default=False),
timeout=dict(type='int', default=300),
password=dict(type='str', required=True, no_log=True),
minutes=dict(type='int', default=30),
debug=dict(type='int', default=0)
),
supports_check_mode=True
)
syslog.openlog('ansible-%s' % module._name)
password = base64.b64decode(module.params['password'])
port = int(module.params['port'])
timeout = int(module.params['timeout'])
minutes = int(module.params['minutes'])
debug = int(module.params['debug'])
ipv6 = module.params['ipv6']
multi_key = module.params['multi_key']
if not HAS_KEYCZAR:
module.fail_json(msg="keyczar is not installed (on the remote side)")
DEBUG_LEVEL = debug
pid_file = get_pid_location(module)
daemon_pid = None
daemon_running = False
if os.path.exists(pid_file):
try:
daemon_pid = int(open(pid_file).read())
try:
# sending signal 0 doesn't do anything to the
# process, other than tell the calling program
# whether other signals can be sent
os.kill(daemon_pid, 0)
except OSError as e:
message = 'the accelerate daemon appears to be running as a different user that this user cannot access pid=%s' % daemon_pid
if e.errno == errno.EPERM:
# no permissions means the pid is probably
# running, but as a different user, so fail
module.fail_json(msg=message)
else:
daemon_running = True
except ValueError:
# invalid pid file, unlink it - otherwise we don't care
try:
os.unlink(pid_file)
except:
pass
if daemon_running and multi_key:
# try to connect to the file socket for the daemon if it exists
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
try:
s.connect(SOCKET_FILE)
s.sendall(password + '\n')
data = ""
while '\n' not in data:
data += s.recv(2048)
res = data.strip()
except:
module.fail_json(msg="failed to connect to the local socket file")
finally:
try:
s.close()
except:
pass
if res in ("OK", "EXISTS"):
module.exit_json(msg="transferred new key to the existing daemon")
else:
module.fail_json(msg="could not transfer new key: %s" % data.strip())
else:
# try to start up the daemon
daemonize(module, password, port, timeout, minutes, ipv6, pid_file)
if __name__ == '__main__':
main()

@ -55,18 +55,13 @@ class Play(Base, Taggable, Become):
# =================================================================================
_name = FieldAttribute(isa='string', default='', always_post_validate=True)
_hosts = FieldAttribute(isa='list', required=True, listof=string_types, always_post_validate=True)
# TODO: generalize connection
_accelerate = FieldAttribute(isa='bool', default=False, always_post_validate=True)
_accelerate_ipv6 = FieldAttribute(isa='bool', default=False, always_post_validate=True)
_accelerate_port = FieldAttribute(isa='int', default=5099, always_post_validate=True)
# Connection
# Facts
_fact_path = FieldAttribute(isa='string', default=None)
_gather_facts = FieldAttribute(isa='bool', default=None, always_post_validate=True)
_gather_subset = FieldAttribute(isa='barelist', default=None, always_post_validate=True)
_gather_timeout = FieldAttribute(isa='int', default=None, always_post_validate=True)
_hosts = FieldAttribute(isa='list', required=True, listof=string_types, always_post_validate=True)
# Variable Attributes
_vars_files = FieldAttribute(isa='list', default=[], priority=99)

@ -55,7 +55,6 @@ __all__ = ['PlayContext']
# in variable names.
MAGIC_VARIABLE_MAPPING = dict(
accelerate_port=('ansible_accelerate_port', ),
# base
connection=('ansible_connection', ),
@ -217,11 +216,6 @@ class PlayContext(Base):
# ???
_connection_lockfd = FieldAttribute(isa='int')
# accelerate FIXME: remove as soon as deprecation period expires
_accelerate = FieldAttribute(isa='bool', default=False)
_accelerate_ipv6 = FieldAttribute(isa='bool', default=False, always_post_validate=True)
_accelerate_port = FieldAttribute(isa='int', default=C.ACCELERATE_PORT, always_post_validate=True)
# privilege escalation fields
_become = FieldAttribute(isa='bool')
_become_method = FieldAttribute(isa='string')
@ -281,12 +275,6 @@ class PlayContext(Base):
the play class.
'''
# special handling for accelerated mode, as it is set in a separate
# play option from the connection parameter
self.accelerate = play.accelerate
self.accelerate_ipv6 = play.accelerate_ipv6
self.accelerate_port = play.accelerate_port
if play.connection:
self.connection = play.connection

@ -722,11 +722,6 @@ class ActionBase(with_metaclass(ABCMeta, object)):
cmd = self._connection._shell.build_module_command(environment_string, shebang, cmd, arg_path=args_file_path, rm_tmp=rm_tmp).strip()
if module_name == "accelerate":
# always run the accelerate module as the user
# specified in the play, not the sudo_user
sudoable = False
# Fix permissions of the tmp path and tmp files. This should be called after all files have been transferred.
if remote_files:
# remove none/empty

@ -1,327 +0,0 @@
# (c) 2012, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
author: Ansible Core Team
connection: accelerate
short_description: Temporary 0mq agent
description:
- This plugin uses one of the other ssh plugins to setup a temporary 0mq daemon on the target to execute subsequent tasks
deprecated:
why: paramiko and ssh + controlpersist perform the same or better without the problems of having an agent.
version: 2.5
alternative: paramiko and ssh with conrol persistence.
"""
import base64
import json
import os
import socket
import struct
import time
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleConnectionFailure
from ansible.module_utils._text import to_bytes
from ansible.parsing.utils.jsonify import jsonify
from ansible.plugins.connection import ConnectionBase
from ansible.utils.encrypt import key_for_hostname, keyczar_encrypt, keyczar_decrypt
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
# the chunk size to read and send, assuming mtu 1500 and
# leaving room for base64 (+33%) encoding and header (8 bytes)
# ((1400-8)/4)*3) = 1044
# which leaves room for the TCP/IP header. We set this to a
# multiple of the value to speed up file reads.
CHUNK_SIZE = 1044 * 20
class Connection(ConnectionBase):
''' raw socket accelerated connection '''
transport = 'accelerate'
has_pipelining = False
become_methods = frozenset(C.BECOME_METHODS).difference(['runas'])
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
self.conn = None
self.key = key_for_hostname(self._play_context.remote_addr)
def _connect(self):
''' activates the connection object '''
if not self._connected:
wrong_user = False
tries = 3
self.conn = socket.socket()
self.conn.settimeout(C.ACCELERATE_CONNECT_TIMEOUT)
display.vvvv("attempting connection to %s via the accelerated port %d" % (self._play_context.remote_addr, self._play_context.accelerate_port),
host=self._play_context.remote_addr)
while tries > 0:
try:
self.conn.connect((self._play_context.remote_addr, self._play_context.accelerate_port))
break
except socket.error:
display.vvvv("connection to %s failed, retrying..." % self._play_context.remote_addr, host=self._play_context.remote_addr)
time.sleep(0.1)
tries -= 1
if tries == 0:
display.vvv("Could not connect via the accelerated connection, exceeded # of tries", host=self._play_context.remote_addr)
raise AnsibleConnectionFailure("Failed to connect to %s on the accelerated port %s" % (self._play_context.remote_addr,
self._play_context.accelerate_port))
elif wrong_user:
display.vvv("Restarting daemon with a different remote_user", host=self._play_context.remote_addr)
raise AnsibleError("The accelerated daemon was started on the remote with a different user")
self.conn.settimeout(C.ACCELERATE_TIMEOUT)
if not self.validate_user():
# the accelerated daemon was started with a
# different remote_user. The above command
# should have caused the accelerate daemon to
# shutdown, so we'll reconnect.
wrong_user = True
self._connected = True
return self
def transport_test(self, connect_timeout):
''' Test the transport mechanism, if available '''
host = self._play_context.remote_addr
port = int(self._play_context.accelerate_port or 5099)
display.vvv("attempting transport test to %s:%s" % (host, port))
sock = socket.create_connection((host, port), connect_timeout)
sock.close()
def send_data(self, data):
packed_len = struct.pack('!Q', len(data))
return self.conn.sendall(packed_len + data)
def recv_data(self):
header_len = 8 # size of a packed unsigned long long
data = b""
try:
display.vvvv("in recv_data(), waiting for the header", host=self._play_context.remote_addr)
while len(data) < header_len:
d = self.conn.recv(header_len - len(data))
if not d:
display.vvvv("received nothing, bailing out", host=self._play_context.remote_addr)
return None
data += d
display.vvvv("got the header, unpacking", host=self._play_context.remote_addr)
data_len = struct.unpack('!Q', data[:header_len])[0]
data = data[header_len:]
display.vvvv("data received so far (expecting %d): %d" % (data_len, len(data)), host=self._play_context.remote_addr)
while len(data) < data_len:
d = self.conn.recv(data_len - len(data))
if not d:
display.vvvv("received nothing, bailing out", host=self._play_context.remote_addr)
return None
display.vvvv("received %d bytes" % (len(d)), host=self._play_context.remote_addr)
data += d
display.vvvv("received all of the data, returning", host=self._play_context.remote_addr)
return data
except socket.timeout:
raise AnsibleError("timed out while waiting to receive data")
def validate_user(self):
'''
Checks the remote uid of the accelerated daemon vs. the
one specified for this play and will cause the accel
daemon to exit if they don't match
'''
display.vvvv("sending request for validate_user", host=self._play_context.remote_addr)
data = dict(
mode='validate_user',
username=self._play_context.remote_user,
)
data = jsonify(data)
data = keyczar_encrypt(self.key, data)
if self.send_data(data):
raise AnsibleError("Failed to send command to %s" % self._play_context.remote_addr)
display.vvvv("waiting for validate_user response", host=self._play_context.remote_addr)
while True:
# we loop here while waiting for the response, because a
# long running command may cause us to receive keepalive packets
# ({"pong":"true"}) rather than the response we want.
response = self.recv_data()
if not response:
raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr)
response = keyczar_decrypt(self.key, response)
response = json.loads(response)
if "pong" in response:
# it's a keepalive, go back to waiting
display.vvvv("received a keepalive packet", host=self._play_context.remote_addr)
continue
else:
display.vvvv("received the validate_user response: %s" % (response), host=self._play_context.remote_addr)
break
if response.get('failed'):
return False
else:
return response.get('rc') == 0
def exec_command(self, cmd, in_data=None, sudoable=True):
''' run a command on the remote host '''
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
if in_data:
raise AnsibleError("Internal Error: this module does not support optimized module pipelining")
display.vvv("EXEC COMMAND %s" % cmd, host=self._play_context.remote_addr)
data = dict(
mode='command',
cmd=cmd,
executable=C.DEFAULT_EXECUTABLE,
)
data = jsonify(data)
data = keyczar_encrypt(self.key, data)
if self.send_data(data):
raise AnsibleError("Failed to send command to %s" % self._play_context.remote_addr)
while True:
# we loop here while waiting for the response, because a
# long running command may cause us to receive keepalive packets
# ({"pong":"true"}) rather than the response we want.
response = self.recv_data()
if not response:
raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr)
response = keyczar_decrypt(self.key, response)
response = json.loads(response)
if "pong" in response:
# it's a keepalive, go back to waiting
display.vvvv("received a keepalive packet", host=self._play_context.remote_addr)
continue
else:
display.vvvv("received the response", host=self._play_context.remote_addr)
break
return (response.get('rc', None), response.get('stdout', ''), response.get('stderr', ''))
def put_file(self, in_path, out_path):
''' transfer a file from local to remote '''
display.vvv("PUT %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr)
in_path = to_bytes(in_path, errors='surrogate_or_strict')
if not os.path.exists(in_path):
raise AnsibleFileNotFound("file or module does not exist: %s" % in_path)
fd = open(in_path, 'rb')
fstat = os.stat(in_path)
try:
display.vvv("PUT file is %d bytes" % fstat.st_size, host=self._play_context.remote_addr)
last = False
while fd.tell() <= fstat.st_size and not last:
display.vvvv("file position currently %ld, file size is %ld" % (fd.tell(), fstat.st_size), host=self._play_context.remote_addr)
data = fd.read(CHUNK_SIZE)
if fd.tell() >= fstat.st_size:
last = True
data = dict(mode='put', data=base64.b64encode(data), out_path=out_path, last=last)
if self._play_context.become:
data['user'] = self._play_context.become_user
data = jsonify(data)
data = keyczar_encrypt(self.key, data)
if self.send_data(data):
raise AnsibleError("failed to send the file to %s" % self._play_context.remote_addr)
response = self.recv_data()
if not response:
raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr)
response = keyczar_decrypt(self.key, response)
response = json.loads(response)
if response.get('failed', False):
raise AnsibleError("failed to put the file in the requested location")
finally:
fd.close()
display.vvvv("waiting for final response after PUT", host=self._play_context.remote_addr)
response = self.recv_data()
if not response:
raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr)
response = keyczar_decrypt(self.key, response)
response = json.loads(response)
if response.get('failed', False):
raise AnsibleError("failed to put the file in the requested location")
def fetch_file(self, in_path, out_path):
''' save a remote file to the specified path '''
display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr)
data = dict(mode='fetch', in_path=in_path)
data = jsonify(data)
data = keyczar_encrypt(self.key, data)
if self.send_data(data):
raise AnsibleError("failed to initiate the file fetch with %s" % self._play_context.remote_addr)
fh = open(to_bytes(out_path, errors='surrogate_or_strict'), "w")
try:
bytes = 0
while True:
response = self.recv_data()
if not response:
raise AnsibleError("Failed to get a response from %s" % self._play_context.remote_addr)
response = keyczar_decrypt(self.key, response)
response = json.loads(response)
if response.get('failed', False):
raise AnsibleError("Error during file fetch, aborting")
out = base64.b64decode(response['data'])
fh.write(out)
bytes += len(out)
# send an empty response back to signify we
# received the last chunk without errors
data = jsonify(dict())
data = keyczar_encrypt(self.key, data)
if self.send_data(data):
raise AnsibleError("failed to send ack during file fetch")
if response.get('last', False):
break
finally:
# we don't currently care about this final response,
# we just receive it and drop it. It may be used at some
# point in the future or we may just have the put/fetch
# operations not send back a final response at all
response = self.recv_data()
display.vvv("FETCH wrote %d bytes to %s" % (bytes, out_path), host=self._play_context.remote_addr)
fh.close()
def close(self):
''' terminate the connection '''
# Be a good citizen
try:
self.conn.close()
except:
pass

@ -1,28 +1,10 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import multiprocessing
import os
import stat
import tempfile
import time
import warnings
import random
from ansible import constants as C
@ -44,37 +26,12 @@ except ImportError:
from ansible.utils.display import Display
display = Display()
KEYCZAR_AVAILABLE = False
try:
try:
# some versions of pycrypto may not have this?
from Crypto.pct_warnings import PowmInsecureWarning
except ImportError:
PowmInsecureWarning = RuntimeWarning
with warnings.catch_warnings(record=True) as warning_handler:
warnings.simplefilter("error", PowmInsecureWarning)
try:
import keyczar.errors as key_errors
from keyczar.keys import AesKey
except PowmInsecureWarning:
display.system_warning(
"The version of gmp you have installed has a known issue regarding "
"timing vulnerabilities when used with pycrypto. "
"If possible, you should update it (i.e. yum update gmp)."
)
warnings.resetwarnings()
warnings.simplefilter("ignore")
import keyczar.errors as key_errors
from keyczar.keys import AesKey
KEYCZAR_AVAILABLE = True
except ImportError:
pass
__all__ = ['do_encrypt']
_LOCK = multiprocessing.Lock()
DEFAULT_PASSWORD_LENGTH = 20
def do_encrypt(result, encrypt, salt_size=None, salt=None):
if PASSLIB_AVAILABLE:
@ -103,73 +60,6 @@ def do_encrypt(result, encrypt, salt_size=None, salt=None):
return to_text(result, errors='strict')
def key_for_hostname(hostname):
# fireball mode is an implementation of ansible firing up zeromq via SSH
# to use no persistent daemons or key management
if not KEYCZAR_AVAILABLE:
raise AnsibleError("python-keyczar must be installed on the control machine to use accelerated modes")
key_path = os.path.expanduser(C.ACCELERATE_KEYS_DIR)
if not os.path.exists(key_path):
# avoid race with multiple forks trying to create paths on host
# but limit when locking is needed to creation only
with(_LOCK):
if not os.path.exists(key_path):
# use a temp directory and rename to ensure the directory
# searched for only appears after permissions applied.
tmp_dir = tempfile.mkdtemp(dir=os.path.dirname(key_path))
os.chmod(tmp_dir, int(C.ACCELERATE_KEYS_DIR_PERMS, 8))
os.rename(tmp_dir, key_path)
elif not os.path.isdir(key_path):
raise AnsibleError('ACCELERATE_KEYS_DIR is not a directory.')
if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_DIR_PERMS, 8):
raise AnsibleError('Incorrect permissions on the private key directory. Use `chmod 0%o %s` to correct this issue, and make sure any of the keys files '
'contained within that directory are set to 0%o' % (int(C.ACCELERATE_KEYS_DIR_PERMS, 8), C.ACCELERATE_KEYS_DIR,
int(C.ACCELERATE_KEYS_FILE_PERMS, 8)))
key_path = os.path.join(key_path, hostname)
# use new AES keys every 2 hours, which means fireball must not allow running for longer either
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2):
# avoid race with multiple forks trying to create key
# but limit when locking is needed to creation only
with(_LOCK):
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2):
key = AesKey.Generate()
# use temp file to ensure file only appears once it has
# desired contents and permissions
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(key_path), delete=False) as fh:
tmp_key_path = fh.name
fh.write(str(key))
os.chmod(tmp_key_path, int(C.ACCELERATE_KEYS_FILE_PERMS, 8))
os.rename(tmp_key_path, key_path)
return key
if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_FILE_PERMS, 8):
raise AnsibleError('Incorrect permissions on the key file for this host. Use `chmod 0%o %s` to '
'correct this issue.' % (int(C.ACCELERATE_KEYS_FILE_PERMS, 8), key_path))
fh = open(key_path)
key = AesKey.Read(fh.read())
fh.close()
return key
def keyczar_encrypt(key, msg):
return key.Encrypt(msg.encode('utf-8'))
def keyczar_decrypt(key, msg):
try:
return key.Decrypt(msg)
except key_errors.InvalidSignatureError:
raise AnsibleError("decryption failed")
DEFAULT_PASSWORD_LENGTH = 20
def random_password(length=DEFAULT_PASSWORD_LENGTH, chars=C.DEFAULT_PASSWORD_CHARS):
'''Return a random password string of length containing only chars

Loading…
Cancel
Save