Add variable compression option

pull/13012/merge
James Cammarata 9 years ago
parent 63c47fb271
commit efbc6054a4

@ -184,6 +184,11 @@
# prevents logging of tasks, but only on the targets, data is still logged on the master/controller
#no_target_syslog = True
# controls the compression level of variables sent to
# worker processes. At the default of 0, no compression
# is used. This value must be an integer from 0 to 9.
#var_compression_level = 9
[privilege_escalation]
#become=True
#become_method=sudo

@ -155,6 +155,7 @@ DEFAULT_GATHERING = get_config(p, DEFAULTS, 'gathering', 'ANSIBLE_GATHER
DEFAULT_LOG_PATH = get_config(p, DEFAULTS, 'log_path', 'ANSIBLE_LOG_PATH', '', ispath=True)
DEFAULT_FORCE_HANDLERS = get_config(p, DEFAULTS, 'force_handlers', 'ANSIBLE_FORCE_HANDLERS', False, boolean=True)
DEFAULT_INVENTORY_IGNORE = get_config(p, DEFAULTS, 'inventory_ignore_extensions', 'ANSIBLE_INVENTORY_IGNORE', ["~", ".orig", ".bak", ".ini", ".cfg", ".retry", ".pyc", ".pyo"], islist=True)
DEFAULT_VAR_COMPRESSION_LEVEL = get_config(p, DEFAULTS, 'var_compression_level', 'ANSIBLE_VAR_COMPRESSION_LEVEL', 0, integer=True)
# disclosure
DEFAULT_NO_LOG = get_config(p, DEFAULTS, 'no_log', 'ANSIBLE_NO_LOG', False, boolean=True)

@ -28,6 +28,7 @@ import signal
import sys
import time
import traceback
import zlib
from jinja2.exceptions import TemplateNotFound
@ -100,7 +101,14 @@ class WorkerProcess(multiprocessing.Process):
task = None
try:
debug("waiting for a message...")
(host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get()
(host, task, basedir, zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get()
if compressed_vars:
job_vars = json.loads(zlib.decompress(zip_vars))
else:
job_vars = zip_vars
job_vars['hostvars'] = hostvars
debug("there's work to be done! got a task/handler to work on: %s" % task)
# because the task queue manager starts workers (forks) before the

@ -26,6 +26,7 @@ import json
import pickle
import sys
import time
import zlib
from jinja2.exceptions import UndefinedError
@ -152,7 +153,28 @@ class StrategyBase:
# way to share them with the forked processes
shared_loader_obj = SharedPluginLoaderObj()
main_q.put((host, task, self._loader.get_basedir(), task_vars, play_context, shared_loader_obj), block=False)
# compress (and convert) the data if so configured, which can
# help a lot when the variable dictionary is huge. We pop the
# hostvars out of the task variables right now, due to the fact
# that they're not JSON serializable
compressed_vars = False
hostvars = task_vars.pop('hostvars', None)
if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0:
zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL)
compressed_vars = True
# we're done with the original dict now, so delete it to
# try and reclaim some memory space, which is helpful if the
# data contained in the dict is very large
del task_vars
else:
zip_vars = task_vars
# and queue the task
main_q.put((host, task, self._loader.get_basedir(), zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj), block=False)
# nuke the hostvars object too, as its no longer needed
del hostvars
self._pending_results += 1
except (EOFError, IOError, AssertionError) as e:
# most likely an abort

@ -173,6 +173,9 @@ class StrategyModule(StrategyBase):
if not task:
continue
if self._tqm._terminated:
break
run_once = False
work_to_do = True

Loading…
Cancel
Save