From efbc6054a4893a77b8d7121d70257bf19f05ffe2 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 5 Nov 2015 16:21:34 -0500 Subject: [PATCH] Add variable compression option --- examples/ansible.cfg | 5 +++++ lib/ansible/constants.py | 1 + lib/ansible/executor/process/worker.py | 10 +++++++++- lib/ansible/plugins/strategy/__init__.py | 24 +++++++++++++++++++++++- lib/ansible/plugins/strategy/linear.py | 3 +++ 5 files changed, 41 insertions(+), 2 deletions(-) diff --git a/examples/ansible.cfg b/examples/ansible.cfg index 33021d3f5fd..74aef7a0246 100644 --- a/examples/ansible.cfg +++ b/examples/ansible.cfg @@ -184,6 +184,11 @@ # prevents logging of tasks, but only on the targets, data is still logged on the master/controller #no_target_syslog = True +# controls the compression level of variables sent to +# worker processes. At the default of 0, no compression +# is used. This value must be an integer from 0 to 9. +#var_compression_level = 9 + [privilege_escalation] #become=True #become_method=sudo diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index 278d567baaa..6ecaaac0b3b 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -155,6 +155,7 @@ DEFAULT_GATHERING = get_config(p, DEFAULTS, 'gathering', 'ANSIBLE_GATHER DEFAULT_LOG_PATH = get_config(p, DEFAULTS, 'log_path', 'ANSIBLE_LOG_PATH', '', ispath=True) DEFAULT_FORCE_HANDLERS = get_config(p, DEFAULTS, 'force_handlers', 'ANSIBLE_FORCE_HANDLERS', False, boolean=True) DEFAULT_INVENTORY_IGNORE = get_config(p, DEFAULTS, 'inventory_ignore_extensions', 'ANSIBLE_INVENTORY_IGNORE', ["~", ".orig", ".bak", ".ini", ".cfg", ".retry", ".pyc", ".pyo"], islist=True) +DEFAULT_VAR_COMPRESSION_LEVEL = get_config(p, DEFAULTS, 'var_compression_level', 'ANSIBLE_VAR_COMPRESSION_LEVEL', 0, integer=True) # disclosure DEFAULT_NO_LOG = get_config(p, DEFAULTS, 'no_log', 'ANSIBLE_NO_LOG', False, boolean=True) diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 4754d767908..1cc1f7df438 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -28,6 +28,7 @@ import signal import sys import time import traceback +import zlib from jinja2.exceptions import TemplateNotFound @@ -100,7 +101,14 @@ class WorkerProcess(multiprocessing.Process): task = None try: debug("waiting for a message...") - (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get() + (host, task, basedir, zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get() + + if compressed_vars: + job_vars = json.loads(zlib.decompress(zip_vars)) + else: + job_vars = zip_vars + job_vars['hostvars'] = hostvars + debug("there's work to be done! got a task/handler to work on: %s" % task) # because the task queue manager starts workers (forks) before the diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index fdbfb707722..7aabf05f093 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -26,6 +26,7 @@ import json import pickle import sys import time +import zlib from jinja2.exceptions import UndefinedError @@ -152,7 +153,28 @@ class StrategyBase: # way to share them with the forked processes shared_loader_obj = SharedPluginLoaderObj() - main_q.put((host, task, self._loader.get_basedir(), task_vars, play_context, shared_loader_obj), block=False) + # compress (and convert) the data if so configured, which can + # help a lot when the variable dictionary is huge. We pop the + # hostvars out of the task variables right now, due to the fact + # that they're not JSON serializable + compressed_vars = False + hostvars = task_vars.pop('hostvars', None) + if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0: + zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL) + compressed_vars = True + # we're done with the original dict now, so delete it to + # try and reclaim some memory space, which is helpful if the + # data contained in the dict is very large + del task_vars + else: + zip_vars = task_vars + + # and queue the task + main_q.put((host, task, self._loader.get_basedir(), zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj), block=False) + + # nuke the hostvars object too, as its no longer needed + del hostvars + self._pending_results += 1 except (EOFError, IOError, AssertionError) as e: # most likely an abort diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 321e5ced177..0b5e0627bd8 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -173,6 +173,9 @@ class StrategyModule(StrategyBase): if not task: continue + if self._tqm._terminated: + break + run_once = False work_to_do = True