From 52a42bf6075da0bc3064f700cdd8bb14a2144416 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Mon, 30 Sep 2013 14:08:07 -0500 Subject: [PATCH 1/8] Add more verbose debugging options for accelerate --- .../runner/connection_plugins/accelerate.py | 13 +++- library/utilities/accelerate | 65 ++++++++++++++----- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 61400d0ab3c..d6ce7243f1b 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -21,7 +21,7 @@ import base64 import socket import struct import time -from ansible.callbacks import vvv +from ansible.callbacks import vvv, vvvv from ansible.runner.connection_plugins.ssh import Connection as SSHConnection from ansible.runner.connection_plugins.paramiko_ssh import Connection as ParamikoConnection from ansible import utils @@ -84,12 +84,13 @@ class Connection(object): utils.AES_KEYS = self.runner.aes_keys def _execute_accelerate_module(self): - args = "password=%s port=%s" % (base64.b64encode(self.key.__str__()), str(self.accport)) + args = "password=%s port=%s debug=%d" % (base64.b64encode(self.key.__str__()), str(self.accport), int(utils.VERBOSITY)) inject = dict(password=self.key) if self.runner.accelerate_inventory_host: inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.runner.accelerate_inventory_host)) else: inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.host)) + vvvv("attempting to start up the accelerate daemon...") self.ssh.connect() tmp_path = self.runner._make_tmp_path(self.ssh) return self.runner._execute_module(self.ssh, tmp_path, 'accelerate', args, inject=inject) @@ -103,11 +104,13 @@ class Connection(object): tries = 3 self.conn = socket.socket() self.conn.settimeout(300.0) + vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport)) while tries > 0: try: self.conn.connect((self.host,self.accport)) break except: + vvvv("failed, retrying...") time.sleep(0.1) tries -= 1 if tries == 0: @@ -133,18 +136,24 @@ class Connection(object): header_len = 8 # size of a packed unsigned long long data = b"" try: + vvvv("%s: in recv_data(), waiting for the header" % self.host) while len(data) < header_len: d = self.conn.recv(1024) if not d: + vvvv("%s: received nothing, bailing out" % self.host) return None data += d + vvvv("%s: got the header, unpacking" % self.host) data_len = struct.unpack('Q',data[:header_len])[0] data = data[header_len:] + vvvv("%s: data received so far (expecting %d): %d" % (self.host,data_len,len(data))) while len(data) < data_len: d = self.conn.recv(1024) if not d: + vvvv("%s: received nothing, bailing out" % self.host) return None data += d + vvvv("%s: received all of the data, returning" % self.host) return data except socket.timeout: raise errors.AnsibleError("timed out while waiting to receive data") diff --git a/library/utilities/accelerate b/library/utilities/accelerate index 371e4adb92b..da16d8395e8 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -85,8 +85,22 @@ PIDFILE = os.path.expanduser("~/.accelerate.pid") # which leaves room for the TCP/IP header CHUNK_SIZE=10240 -def log(msg): - syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) +# FIXME: this all should be moved to module_common, as it's +# pretty much a copy from the callbacks/util code +DEBUG_LEVEL=0 +def log(msg, cap=0): + global DEBUG_LEVEL + if cap >= DEBUG_LEVEL: + syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) + +def vv(msg): + log(msg, cap=2) + +def vvv(msg): + log(msg, cap=3) + +def vvvv(msg): + log(msg, cap=4) if os.path.exists(PIDFILE): try: @@ -114,7 +128,7 @@ def daemonize_self(module, password, port, minutes): try: pid = os.fork() if pid > 0: - log("exiting pid %s" % pid) + vvv("exiting pid %s" % pid) # exit first parent module.exit_json(msg="daemonized accelerate on port %s for %s minutes" % (port, minutes)) except OSError, e: @@ -134,7 +148,7 @@ def daemonize_self(module, password, port, minutes): pid_file = open(PIDFILE, "w") pid_file.write("%s" % pid) pid_file.close() - log("pidfile written") + vvv("pidfile written") sys.exit(0) except OSError, e: log("fork #2 failed: %d (%s)" % (e.errno, e.strerror)) @@ -162,52 +176,64 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): def recv_data(self): header_len = 8 # size of a packed unsigned long long data = b"" + vvvv("in recv_data(), waiting for the header") while len(data) < header_len: d = self.request.recv(1024) if not d: + vvv("received nothing, bailing out") return None data += d + vvvv("in recv_data(), got the header, unpacking") data_len = struct.unpack('Q',data[:header_len])[0] data = data[header_len:] + vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) while len(data) < data_len: d = self.request.recv(1024) if not d: + vvv("received nothing, bailing out") return None data += d + vvvv("received all of the data, returning") return data def handle(self): while True: - #log("waiting for data") + vvvv("waiting for data") data = self.recv_data() if not data: + vvvv("received nothing back from recv_data(), breaking out") break try: - #log("got data, decrypting") + vvvv("got data, decrypting") data = self.server.key.Decrypt(data) - #log("decryption done") + vvvv("decryption done") except: - log("bad decrypt, skipping...") + vv("bad decrypt, skipping...") data2 = json.dumps(dict(rc=1)) data2 = self.server.key.Encrypt(data2) send_data(client, data2) return - #log("loading json from the data") + vvvv("loading json from the data") data = json.loads(data) mode = data['mode'] response = {} if mode == 'command': + vvvv("received a command request, running it") response = self.command(data) elif mode == 'put': + vvvv("received a put request, putting it") response = self.put(data) elif mode == 'fetch': + vvvv("received a fetch request, getting it") response = self.fetch(data) data2 = json.dumps(response) data2 = self.server.key.Encrypt(data2) + vvvv("sending the response back to the controller") self.send_data(data2) + vvvv("done sending the response") def command(self, data): if 'cmd' not in data: @@ -217,14 +243,14 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): if 'executable' not in data: return dict(failed=True, msg='internal error: executable is required') - #log("executing: %s" % data['cmd']) + vvvv("executing: %s" % data['cmd']) rc, stdout, stderr = self.server.module.run_command(data['cmd'], executable=data['executable'], close_fds=True) if stdout is None: stdout = '' if stderr is None: stderr = '' - #log("got stdout: %s" % stdout) - #log("got stderr: %s" % stderr) + vvvv("got stdout: %s" % stdout) + vvvv("got stderr: %s" % stderr) return dict(rc=rc, stdout=stdout, stderr=stderr) @@ -235,7 +261,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): try: fd = file(data['in_path'], 'rb') fstat = os.stat(data['in_path']) - log("FETCH file is %d bytes" % fstat.st_size) + vvv("FETCH file is %d bytes" % fstat.st_size) while fd.tell() < fstat.st_size: data = fd.read(CHUNK_SIZE) last = False @@ -276,7 +302,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): final_path = None final_user = None if 'user' in data and data.get('user') != getpass.getuser(): - log("the target user doesn't match this user, we'll move the file into place via sudo") + vv("the target user doesn't match this user, we'll move the file into place via sudo") (fd,out_path) = tempfile.mkstemp(prefix='ansible.', dir=os.path.expanduser('~/.ansible/tmp/')) out_fd = os.fdopen(fd, 'w', 0) final_path = data['out_path'] @@ -306,11 +332,11 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): log("failed to put the file: %s" % tb) return dict(failed=True, stdout="Could not write the file") finally: - #log("wrote %d bytes" % bytes) + vvvv("wrote %d bytes" % bytes) out_fd.close() if final_path: - log("moving %s to %s" % (out_path, final_path)) + vvv("moving %s to %s" % (out_path, final_path)) args = ['sudo','cp',out_path,final_path] rc, stdout, stderr = self.server.module.run_command(args, close_fds=True) if rc != 0: @@ -334,7 +360,7 @@ def daemonize(module, password, port, minutes): server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password) server.allow_reuse_address = True - log("serving!") + vv("serving!") server.serve_forever(poll_interval=1.0) except Exception, e: tb = traceback.format_exc() @@ -342,11 +368,13 @@ def daemonize(module, password, port, minutes): sys.exit(0) def main(): + global DEBUG_LEVEL module = AnsibleModule( argument_spec = dict( port=dict(required=False, default=5099), password=dict(required=True), minutes=dict(required=False, default=30), + debug=dict(required=False, default=0, type='int') ), supports_check_mode=True ) @@ -354,10 +382,13 @@ def main(): password = base64.b64decode(module.params['password']) port = int(module.params['port']) minutes = int(module.params['minutes']) + debug = int(module.params['debug']) if not HAS_KEYCZAR: module.fail_json(msg="keyczar is not installed") + DEBUG_LEVEL=debug + daemonize(module, password, port, minutes) From f9c87868ac4f90af4ba89e5060bee77f7d314343 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 15:19:21 -0500 Subject: [PATCH 2/8] Added keepalive packets to accelerate mode Commands will now be started up in a separate task from the main handler thread, so that it can be monitored for completeness while sending a keepalive packet back to the controller to avoid a socket receive timeout. --- .../runner/connection_plugins/accelerate.py | 21 +++- library/utilities/accelerate | 119 ++++++++++++------ 2 files changed, 94 insertions(+), 46 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index d6ce7243f1b..3eb08e7b80e 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -180,11 +180,22 @@ class Connection(object): if self.send_data(data): raise errors.AnsibleError("Failed to send command to %s" % self.host) - response = self.recv_data() - if not response: - raise errors.AnsibleError("Failed to get a response from %s" % self.host) - response = utils.decrypt(self.key, response) - response = utils.parse_json(response) + while True: + # we loop here while waiting for the response, because a + # long running command may cause us to receive keepalive packets + # ({"pong":"true"}) rather than the response we want. + response = self.recv_data() + if not response: + raise errors.AnsibleError("Failed to get a response from %s" % self.host) + response = utils.decrypt(self.key, response) + response = utils.parse_json(response) + if "pong" in response: + # it's a keepalive, go back to waiting + vvvv("received a keepalive packet") + continue + else: + vvvv("received the response") + break return (response.get('rc',None), '', response.get('stdout',''), response.get('stderr','')) diff --git a/library/utilities/accelerate b/library/utilities/accelerate index da16d8395e8..f7cdea2985c 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -58,24 +58,25 @@ EXAMPLES = ''' - command: /usr/bin/anything ''' +import base64 +import getpass import os import os.path -import tempfile -import sys import shutil +import signal import socket import struct -import time -import base64 -import getpass +import sys import syslog -import signal +import tempfile import time -import signal import traceback import SocketServer +from datetime import datetime +from threading import Thread + syslog.openlog('ansible-%s' % os.path.basename(__file__)) PIDFILE = os.path.expanduser("~/.accelerate.pid") @@ -160,6 +161,19 @@ def daemonize_self(module, password, port, minutes): os.dup2(dev_null.fileno(), sys.stderr.fileno()) log("daemonizing successful") +class ThreadWithReturnValue(Thread): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): + Thread.__init__(self, group, target, name, args, kwargs, Verbose) + self._return = None + + def run(self): + if self._Thread__target is not None: + self._return = self._Thread__target(*self._Thread__args, + **self._Thread__kwargs) + def join(self,timeout=None): + Thread.join(self, timeout=timeout) + return self._return + class ThreadedTCPServer(SocketServer.ThreadingTCPServer): def __init__(self, server_address, RequestHandlerClass, module, password): self.module = module @@ -193,47 +207,70 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): vvv("received nothing, bailing out") return None data += d + vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) vvvv("received all of the data, returning") return data def handle(self): - while True: - vvvv("waiting for data") - data = self.recv_data() - if not data: - vvvv("received nothing back from recv_data(), breaking out") - break - try: - vvvv("got data, decrypting") - data = self.server.key.Decrypt(data) - vvvv("decryption done") - except: - vv("bad decrypt, skipping...") - data2 = json.dumps(dict(rc=1)) + try: + while True: + vvvv("waiting for data") + data = self.recv_data() + if not data: + vvvv("received nothing back from recv_data(), breaking out") + break + try: + vvvv("got data, decrypting") + data = self.server.key.Decrypt(data) + vvvv("decryption done") + except: + vv("bad decrypt, skipping...") + data2 = json.dumps(dict(rc=1)) + data2 = self.server.key.Encrypt(data2) + send_data(client, data2) + return + + vvvv("loading json from the data") + data = json.loads(data) + + mode = data['mode'] + response = {} + last_pong = datetime.now() + if mode == 'command': + vvvv("received a command request, running it") + twrv = ThreadWithReturnValue(target=self.command, args=(data,)) + twrv.start() + response = None + while twrv.is_alive(): + if (datetime.now() - last_pong).seconds >= 15: + last_pong = datetime.now() + vvvv("command still running, sending keepalive packet") + data2 = json.dumps(dict(pong=True)) + data2 = self.server.key.Encrypt(data2) + self.send_data(data2) + time.sleep(0.1) + response = twrv._return + vvvv("thread is done, response from join was %s" % response) + elif mode == 'put': + vvvv("received a put request, putting it") + response = self.put(data) + elif mode == 'fetch': + vvvv("received a fetch request, getting it") + response = self.fetch(data) + + vvvv("response result is %s" % str(response)) + data2 = json.dumps(response) data2 = self.server.key.Encrypt(data2) - send_data(client, data2) - return - - vvvv("loading json from the data") - data = json.loads(data) - - mode = data['mode'] - response = {} - if mode == 'command': - vvvv("received a command request, running it") - response = self.command(data) - elif mode == 'put': - vvvv("received a put request, putting it") - response = self.put(data) - elif mode == 'fetch': - vvvv("received a fetch request, getting it") - response = self.fetch(data) - - data2 = json.dumps(response) + vvvv("sending the response back to the controller") + self.send_data(data2) + vvvv("done sending the response") + except: + tb = traceback.format_exc() + log("encountered an unhandled exception in the handle() function") + log("error was:\n%s" % tb) + data2 = json.dumps(dict(rc=1, failed=True, msg="unhandled error in the handle() function")) data2 = self.server.key.Encrypt(data2) - vvvv("sending the response back to the controller") self.send_data(data2) - vvvv("done sending the response") def command(self, data): if 'cmd' not in data: From 59a5ce23d9ab740f65f57c670cef80bf373c6088 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 19 Sep 2013 12:21:10 -0500 Subject: [PATCH 3/8] Adding an accelerate_timeout parameter for plays This setting makes the timeout for each play configurable, rather than hard-coding it at 300 seconds (now the default if left unspecified) Fixes #4162 --- lib/ansible/constants.py | 1 + lib/ansible/playbook/__init__.py | 4 +++- lib/ansible/playbook/play.py | 5 +++-- lib/ansible/runner/__init__.py | 8 ++++++++ .../runner/connection_plugins/accelerate.py | 2 +- library/utilities/accelerate | 19 +++++++++++++------ 6 files changed, 29 insertions(+), 10 deletions(-) diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index d37f0c58b53..7391424682e 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -135,6 +135,7 @@ ANSIBLE_SSH_CONTROL_PATH = get_config(p, 'ssh_connection', 'control_path', PARAMIKO_RECORD_HOST_KEYS = get_config(p, 'paramiko_connection', 'record_host_keys', 'ANSIBLE_PARAMIKO_RECORD_HOST_KEYS', True, boolean=True) ZEROMQ_PORT = get_config(p, 'fireball_connection', 'zeromq_port', 'ANSIBLE_ZEROMQ_PORT', 5099, integer=True) ACCELERATE_PORT = get_config(p, 'accelerate', 'accelerate_port', 'ACCELERATE_PORT', 5099, integer=True) +ACCELERATE_TIMEOUT = int(get_config(p, 'accelerate', 'accelerate_timeout', 'ACCELERATE_TIMEOUT', 300)) DEFAULT_UNDEFINED_VAR_BEHAVIOR = get_config(p, DEFAULTS, 'error_on_undefined_vars', 'ANSIBLE_ERROR_ON_UNDEFINED_VARS', True, boolean=True) HOST_KEY_CHECKING = get_config(p, DEFAULTS, 'host_key_checking', 'ANSIBLE_HOST_KEY_CHECKING', True, boolean=True) diff --git a/lib/ansible/playbook/__init__.py b/lib/ansible/playbook/__init__.py index 216ddc61466..f350ff9aa45 100644 --- a/lib/ansible/playbook/__init__.py +++ b/lib/ansible/playbook/__init__.py @@ -314,6 +314,7 @@ class PlayBook(object): transport=task.transport, sudo_pass=task.sudo_pass, is_playbook=True, check=self.check, diff=self.diff, environment=task.environment, complex_args=task.args, accelerate=task.play.accelerate, accelerate_port=task.play.accelerate_port, + accelerate_timeout=task.play.accelerate_timeout, error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR ) @@ -454,7 +455,8 @@ class PlayBook(object): setup_cache=self.SETUP_CACHE, callbacks=self.runner_callbacks, sudo=play.sudo, sudo_user=play.sudo_user, transport=play.transport, sudo_pass=self.sudo_pass, is_playbook=True, module_vars=play.vars, default_vars=play.default_vars, check=self.check, diff=self.diff, - accelerate=play.accelerate, accelerate_port=play.accelerate_port + accelerate=play.accelerate, accelerate_port=play.accelerate_port, + accelerate_timeout=play.accelerate_timeout ).run() self.stats.compute(setup_results, setup=True) diff --git a/lib/ansible/playbook/play.py b/lib/ansible/playbook/play.py index 0273be841c8..1d617003eab 100644 --- a/lib/ansible/playbook/play.py +++ b/lib/ansible/playbook/play.py @@ -30,7 +30,7 @@ class Play(object): __slots__ = [ 'hosts', 'name', 'vars', 'default_vars', 'vars_prompt', 'vars_files', 'handlers', 'remote_user', 'remote_port', 'included_roles', 'accelerate', - 'accelerate_port', 'sudo', 'sudo_user', 'transport', 'playbook', + 'accelerate_port', 'accelerate_timeout', 'sudo', 'sudo_user', 'transport', 'playbook', 'tags', 'gather_facts', 'serial', '_ds', '_handlers', '_tasks', 'basedir', 'any_errors_fatal', 'roles', 'max_fail_pct' ] @@ -40,7 +40,7 @@ class Play(object): VALID_KEYS = [ 'hosts', 'name', 'vars', 'vars_prompt', 'vars_files', 'tasks', 'handlers', 'remote_user', 'user', 'port', 'include', 'accelerate', 'accelerate_port', - 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', + 'accelerate_timeout', 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', 'any_errors_fatal', 'roles', 'pre_tasks', 'post_tasks', 'max_fail_percentage' ] @@ -114,6 +114,7 @@ class Play(object): self.any_errors_fatal = utils.boolean(ds.get('any_errors_fatal', 'false')) self.accelerate = utils.boolean(ds.get('accelerate', 'false')) self.accelerate_port = ds.get('accelerate_port', None) + self.accelerate_timeout = int(ds.get('accelerate_timeout', 300)) self.max_fail_pct = int(ds.get('max_fail_percentage', 100)) load_vars = {} diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index ac22fb1399c..e336a13da4c 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -136,6 +136,7 @@ class Runner(object): error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR, # ex. False accelerate=False, # use accelerated connection accelerate_port=None, # port to use with accelerated connection + accelerate_timeout=None, # number of seconds to wait for a response on the accelerated connection ): # used to lock multiprocess inputs and outputs at various levels @@ -179,6 +180,7 @@ class Runner(object): self.error_on_undefined_vars = error_on_undefined_vars self.accelerate = accelerate self.accelerate_port = accelerate_port + self.accelerate_timeout = accelerate_timeout self.callbacks.runner = self self.original_transport = self.transport @@ -581,6 +583,12 @@ class Runner(object): actual_transport = "accelerate" if not self.accelerate_port: self.accelerate_port = C.ACCELERATE_PORT + try: + if not self.accelerate_timeout: + self.accelerate_timeout = C.ACCELERATE_TIMEOUT + self.accelerate_timeout = int(self.accelerate_timeout) + except: + raise errors.AnsibleError("invalid value for the accelerate_timeout parameter") if actual_transport in [ 'paramiko', 'ssh', 'accelerate' ]: actual_port = inject.get('ansible_ssh_port', port) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 3eb08e7b80e..0571fb25c76 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -103,7 +103,7 @@ class Connection(object): # TODO: make the timeout and retries configurable? tries = 3 self.conn = socket.socket() - self.conn.settimeout(300.0) + self.conn.settimeout(self.runner.accelerate_timeout) vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport)) while tries > 0: try: diff --git a/library/utilities/accelerate b/library/utilities/accelerate index f7cdea2985c..420d78bdbef 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -35,6 +35,12 @@ options: required: false default: 5099 aliases: [] + timeout: + description: + - The number of seconds the socket will wait for data. If none is received when the timeout value is reached, the connection will be closed. + required: false + default: 300 + aliases: [] minutes: description: - The I(accelerate) listener daemon is started on nodes and will stay around for @@ -175,11 +181,11 @@ class ThreadWithReturnValue(Thread): return self._return class ThreadedTCPServer(SocketServer.ThreadingTCPServer): - def __init__(self, server_address, RequestHandlerClass, module, password): + def __init__(self, server_address, RequestHandlerClass, module, password, timeout): self.module = module self.key = AesKey.Read(password) self.allow_reuse_address = True - self.timeout = None + self.timeout = timeout SocketServer.ThreadingTCPServer.__init__(self, server_address, RequestHandlerClass) class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): @@ -384,7 +390,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): return dict(failed=True, stdout="failed to chown the file via sudo") return dict() -def daemonize(module, password, port, minutes): +def daemonize(module, password, port, timeout, minutes): try: daemonize_self(module, password, port, minutes) @@ -394,7 +400,7 @@ def daemonize(module, password, port, minutes): signal.signal(signal.SIGALRM, catcher) signal.setitimer(signal.ITIMER_REAL, 60 * minutes) - server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password) + server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password, timeout) server.allow_reuse_address = True vv("serving!") @@ -409,6 +415,7 @@ def main(): module = AnsibleModule( argument_spec = dict( port=dict(required=False, default=5099), + timeout=dict(required=False, default=300), password=dict(required=True), minutes=dict(required=False, default=30), debug=dict(required=False, default=0, type='int') @@ -418,6 +425,7 @@ def main(): password = base64.b64decode(module.params['password']) port = int(module.params['port']) + timeout = int(module.params['timeout']) minutes = int(module.params['minutes']) debug = int(module.params['debug']) @@ -426,8 +434,7 @@ def main(): DEBUG_LEVEL=debug - daemonize(module, password, port, minutes) - + daemonize(module, password, port, timeout, minutes) # this is magic, see lib/ansible/module_common.py #<> From d31710337101358cbb56c3a312b42a7ce63a7c51 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 19 Sep 2013 14:11:36 -0500 Subject: [PATCH 4/8] Added in an accelerate connection timeout setting --- lib/ansible/constants.py | 1 + lib/ansible/runner/connection_plugins/accelerate.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index 7391424682e..fdab0bf1684 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -136,6 +136,7 @@ PARAMIKO_RECORD_HOST_KEYS = get_config(p, 'paramiko_connection', 'record_ho ZEROMQ_PORT = get_config(p, 'fireball_connection', 'zeromq_port', 'ANSIBLE_ZEROMQ_PORT', 5099, integer=True) ACCELERATE_PORT = get_config(p, 'accelerate', 'accelerate_port', 'ACCELERATE_PORT', 5099, integer=True) ACCELERATE_TIMEOUT = int(get_config(p, 'accelerate', 'accelerate_timeout', 'ACCELERATE_TIMEOUT', 300)) +ACCELERATE_CONNECT_TIMEOUT = float(get_config(p, 'accelerate', 'accelerate_connect_timeout', 'ACCELERATE_CONNECT_TIMEOUT', 1.0)) DEFAULT_UNDEFINED_VAR_BEHAVIOR = get_config(p, DEFAULTS, 'error_on_undefined_vars', 'ANSIBLE_ERROR_ON_UNDEFINED_VARS', True, boolean=True) HOST_KEY_CHECKING = get_config(p, DEFAULTS, 'host_key_checking', 'ANSIBLE_HOST_KEY_CHECKING', True, boolean=True) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 0571fb25c76..414c070c7d2 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -103,7 +103,7 @@ class Connection(object): # TODO: make the timeout and retries configurable? tries = 3 self.conn = socket.socket() - self.conn.settimeout(self.runner.accelerate_timeout) + self.conn.settimeout(constants.ACCELERATE_CONNECT_TIMEOUT) vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport)) while tries > 0: try: @@ -116,6 +116,7 @@ class Connection(object): if tries == 0: vvv("Could not connect via the accelerated connection, exceeded # of tries") raise errors.AnsibleError("Failed to connect") + self.conn.settimeout(self.runner.accelerate_timeout) except: if allow_ssh: vvv("Falling back to ssh to startup accelerated mode") From 8c177112475ba1d0ebe3173210433ad688f67f0d Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 15:33:18 -0500 Subject: [PATCH 5/8] Removing accelerate_timeout as a playbook option This will remain in ansible.cfg only. --- lib/ansible/playbook/__init__.py | 2 -- lib/ansible/playbook/play.py | 5 ++--- lib/ansible/runner/__init__.py | 8 -------- lib/ansible/runner/connection_plugins/accelerate.py | 3 +-- 4 files changed, 3 insertions(+), 15 deletions(-) diff --git a/lib/ansible/playbook/__init__.py b/lib/ansible/playbook/__init__.py index f350ff9aa45..c964a53fbd4 100644 --- a/lib/ansible/playbook/__init__.py +++ b/lib/ansible/playbook/__init__.py @@ -314,7 +314,6 @@ class PlayBook(object): transport=task.transport, sudo_pass=task.sudo_pass, is_playbook=True, check=self.check, diff=self.diff, environment=task.environment, complex_args=task.args, accelerate=task.play.accelerate, accelerate_port=task.play.accelerate_port, - accelerate_timeout=task.play.accelerate_timeout, error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR ) @@ -456,7 +455,6 @@ class PlayBook(object): transport=play.transport, sudo_pass=self.sudo_pass, is_playbook=True, module_vars=play.vars, default_vars=play.default_vars, check=self.check, diff=self.diff, accelerate=play.accelerate, accelerate_port=play.accelerate_port, - accelerate_timeout=play.accelerate_timeout ).run() self.stats.compute(setup_results, setup=True) diff --git a/lib/ansible/playbook/play.py b/lib/ansible/playbook/play.py index 1d617003eab..0273be841c8 100644 --- a/lib/ansible/playbook/play.py +++ b/lib/ansible/playbook/play.py @@ -30,7 +30,7 @@ class Play(object): __slots__ = [ 'hosts', 'name', 'vars', 'default_vars', 'vars_prompt', 'vars_files', 'handlers', 'remote_user', 'remote_port', 'included_roles', 'accelerate', - 'accelerate_port', 'accelerate_timeout', 'sudo', 'sudo_user', 'transport', 'playbook', + 'accelerate_port', 'sudo', 'sudo_user', 'transport', 'playbook', 'tags', 'gather_facts', 'serial', '_ds', '_handlers', '_tasks', 'basedir', 'any_errors_fatal', 'roles', 'max_fail_pct' ] @@ -40,7 +40,7 @@ class Play(object): VALID_KEYS = [ 'hosts', 'name', 'vars', 'vars_prompt', 'vars_files', 'tasks', 'handlers', 'remote_user', 'user', 'port', 'include', 'accelerate', 'accelerate_port', - 'accelerate_timeout', 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', + 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', 'any_errors_fatal', 'roles', 'pre_tasks', 'post_tasks', 'max_fail_percentage' ] @@ -114,7 +114,6 @@ class Play(object): self.any_errors_fatal = utils.boolean(ds.get('any_errors_fatal', 'false')) self.accelerate = utils.boolean(ds.get('accelerate', 'false')) self.accelerate_port = ds.get('accelerate_port', None) - self.accelerate_timeout = int(ds.get('accelerate_timeout', 300)) self.max_fail_pct = int(ds.get('max_fail_percentage', 100)) load_vars = {} diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index e336a13da4c..ac22fb1399c 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -136,7 +136,6 @@ class Runner(object): error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR, # ex. False accelerate=False, # use accelerated connection accelerate_port=None, # port to use with accelerated connection - accelerate_timeout=None, # number of seconds to wait for a response on the accelerated connection ): # used to lock multiprocess inputs and outputs at various levels @@ -180,7 +179,6 @@ class Runner(object): self.error_on_undefined_vars = error_on_undefined_vars self.accelerate = accelerate self.accelerate_port = accelerate_port - self.accelerate_timeout = accelerate_timeout self.callbacks.runner = self self.original_transport = self.transport @@ -583,12 +581,6 @@ class Runner(object): actual_transport = "accelerate" if not self.accelerate_port: self.accelerate_port = C.ACCELERATE_PORT - try: - if not self.accelerate_timeout: - self.accelerate_timeout = C.ACCELERATE_TIMEOUT - self.accelerate_timeout = int(self.accelerate_timeout) - except: - raise errors.AnsibleError("invalid value for the accelerate_timeout parameter") if actual_transport in [ 'paramiko', 'ssh', 'accelerate' ]: actual_port = inject.get('ansible_ssh_port', port) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 414c070c7d2..c4d7e4c9915 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -100,7 +100,6 @@ class Connection(object): try: if not self.is_connected: - # TODO: make the timeout and retries configurable? tries = 3 self.conn = socket.socket() self.conn.settimeout(constants.ACCELERATE_CONNECT_TIMEOUT) @@ -116,7 +115,7 @@ class Connection(object): if tries == 0: vvv("Could not connect via the accelerated connection, exceeded # of tries") raise errors.AnsibleError("Failed to connect") - self.conn.settimeout(self.runner.accelerate_timeout) + self.conn.settimeout(constants.ACCELERATE_TIMEOUT) except: if allow_ssh: vvv("Falling back to ssh to startup accelerated mode") From 12f69575962110ccc8342960ed514b61e5ccf172 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 15:34:58 -0500 Subject: [PATCH 6/8] Cleaning up some vvvv log messages in accelerate --- lib/ansible/runner/connection_plugins/accelerate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index c4d7e4c9915..9e37802ee79 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -191,10 +191,10 @@ class Connection(object): response = utils.parse_json(response) if "pong" in response: # it's a keepalive, go back to waiting - vvvv("received a keepalive packet") + vvvv("%s: received a keepalive packet" % self.host) continue else: - vvvv("received the response") + vvvv("%s: received the response" % self.host) break return (response.get('rc',None), '', response.get('stdout',''), response.get('stderr','')) From 8923a5b0d98160e10c43a21a66c68d3247693be4 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 16:10:48 -0500 Subject: [PATCH 7/8] Drop default config value for accelerate timeout to 30 seconds --- lib/ansible/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index fdab0bf1684..852ea506b6f 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -135,7 +135,7 @@ ANSIBLE_SSH_CONTROL_PATH = get_config(p, 'ssh_connection', 'control_path', PARAMIKO_RECORD_HOST_KEYS = get_config(p, 'paramiko_connection', 'record_host_keys', 'ANSIBLE_PARAMIKO_RECORD_HOST_KEYS', True, boolean=True) ZEROMQ_PORT = get_config(p, 'fireball_connection', 'zeromq_port', 'ANSIBLE_ZEROMQ_PORT', 5099, integer=True) ACCELERATE_PORT = get_config(p, 'accelerate', 'accelerate_port', 'ACCELERATE_PORT', 5099, integer=True) -ACCELERATE_TIMEOUT = int(get_config(p, 'accelerate', 'accelerate_timeout', 'ACCELERATE_TIMEOUT', 300)) +ACCELERATE_TIMEOUT = int(get_config(p, 'accelerate', 'accelerate_timeout', 'ACCELERATE_TIMEOUT', 30)) ACCELERATE_CONNECT_TIMEOUT = float(get_config(p, 'accelerate', 'accelerate_connect_timeout', 'ACCELERATE_CONNECT_TIMEOUT', 1.0)) DEFAULT_UNDEFINED_VAR_BEHAVIOR = get_config(p, DEFAULTS, 'error_on_undefined_vars', 'ANSIBLE_ERROR_ON_UNDEFINED_VARS', True, boolean=True) From fa80a17aa3e5e9ccfb7ee486236269797057d8b9 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 16:50:32 -0500 Subject: [PATCH 8/8] Make recv_data less greedy so it doesn't eat other packets --- lib/ansible/runner/connection_plugins/accelerate.py | 4 ++-- library/utilities/accelerate | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 9e37802ee79..413f85e6069 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -138,7 +138,7 @@ class Connection(object): try: vvvv("%s: in recv_data(), waiting for the header" % self.host) while len(data) < header_len: - d = self.conn.recv(1024) + d = self.conn.recv(header_len - len(data)) if not d: vvvv("%s: received nothing, bailing out" % self.host) return None @@ -148,7 +148,7 @@ class Connection(object): data = data[header_len:] vvvv("%s: data received so far (expecting %d): %d" % (self.host,data_len,len(data))) while len(data) < data_len: - d = self.conn.recv(1024) + d = self.conn.recv(data_len - len(data)) if not d: vvvv("%s: received nothing, bailing out" % self.host) return None diff --git a/library/utilities/accelerate b/library/utilities/accelerate index 420d78bdbef..0cc29dc5956 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -198,7 +198,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): data = b"" vvvv("in recv_data(), waiting for the header") while len(data) < header_len: - d = self.request.recv(1024) + d = self.request.recv(header_len - len(data)) if not d: vvv("received nothing, bailing out") return None @@ -208,7 +208,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): data = data[header_len:] vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) while len(data) < data_len: - d = self.request.recv(1024) + d = self.request.recv(data_len - len(data)) if not d: vvv("received nothing, bailing out") return None