Starting work on getting integration tests working on v2

This is incomplete work, and requires some minor tweeks to the integration
tests which are not included in this commit.
pull/9989/head
James Cammarata 10 years ago
parent d7f67ea62b
commit 2aeb79f45f

@ -51,7 +51,7 @@ class ConnectionInformation:
self.sudo_user = '' self.sudo_user = ''
self.sudo_pass = '' self.sudo_pass = ''
self.verbosity = 0 self.verbosity = 0
self.only_tags = set(['all']) self.only_tags = set()
self.skip_tags = set() self.skip_tags = set()
if play: if play:
@ -101,6 +101,9 @@ class ConnectionInformation:
elif isinstance(options.tags, basestring): elif isinstance(options.tags, basestring):
self.only_tags.update(options.tags.split(',')) self.only_tags.update(options.tags.split(','))
if len(self.only_tags) == 0:
self.only_tags = set(['all'])
if hasattr(options, 'skip_tags'): if hasattr(options, 'skip_tags'):
if isinstance(options.skip_tags, list): if isinstance(options.skip_tags, list):
self.skip_tags.update(options.skip_tags) self.skip_tags.update(options.skip_tags)

@ -53,6 +53,7 @@ class PlaybookExecutor:
signal.signal(signal.SIGINT, self._cleanup) signal.signal(signal.SIGINT, self._cleanup)
result = 0
try: try:
for playbook_path in self._playbooks: for playbook_path in self._playbooks:
pb = Playbook.load(playbook_path, variable_manager=self._variable_manager, loader=self._loader) pb = Playbook.load(playbook_path, variable_manager=self._variable_manager, loader=self._loader)
@ -67,7 +68,6 @@ class PlaybookExecutor:
new_play = play.copy() new_play = play.copy()
new_play.post_validate(all_vars, fail_on_undefined=False) new_play.post_validate(all_vars, fail_on_undefined=False)
result = True
for batch in self._get_serialized_batches(new_play): for batch in self._get_serialized_batches(new_play):
if len(batch) == 0: if len(batch) == 0:
raise AnsibleError("No hosts matched the list specified in the play", obj=play._ds) raise AnsibleError("No hosts matched the list specified in the play", obj=play._ds)
@ -75,22 +75,22 @@ class PlaybookExecutor:
self._inventory.restrict_to_hosts(batch) self._inventory.restrict_to_hosts(batch)
# and run it... # and run it...
result = self._tqm.run(play=play) result = self._tqm.run(play=play)
if not result: if result != 0:
break break
if not result: if result != 0:
# FIXME: do something here, to signify the playbook execution failed # FIXME: do something here, to signify the playbook execution failed
self._cleanup() self._cleanup()
return 1 return result
except: except:
self._cleanup() self._cleanup()
raise raise
self._cleanup() self._cleanup()
return 0 return result
def _cleanup(self, signum=None, framenum=None): def _cleanup(self, signum=None, framenum=None):
self._tqm.cleanup() return self._tqm.cleanup()
def _get_serialized_batches(self, play): def _get_serialized_batches(self, play):
''' '''

@ -141,9 +141,9 @@ class ResultProcess(multiprocessing.Process):
else: else:
self._send_result(('set_host_facts', result._host, result._result['ansible_facts'])) self._send_result(('set_host_facts', result._host, result._result['ansible_facts']))
# if this task is registering a result, do it now # if this task is registering a result, do it now
if result._task.register: if result._task.register:
self._send_result(('set_host_var', result._host, result._task.register, result._result)) self._send_result(('set_host_var', result._host, result._task.register, result._result))
except Queue.Empty: except Queue.Empty:
pass pass

@ -109,15 +109,14 @@ class TaskExecutor:
try: try:
tmp_task = self._task.copy() tmp_task = self._task.copy()
tmp_task.post_validate(task_vars)
except AnsibleParserError, e: except AnsibleParserError, e:
results.append(dict(failed=True, msg=str(e))) results.append(dict(failed=True, msg=str(e)))
continue continue
# now we swap the internal task with the re-validate copy, execute, # now we swap the internal task with the copy, execute,
# and swap them back so we can do the next iteration cleanly # and swap them back so we can do the next iteration cleanly
(self._task, tmp_task) = (tmp_task, self._task) (self._task, tmp_task) = (tmp_task, self._task)
res = self._execute() res = self._execute(variables=task_vars)
(self._task, tmp_task) = (tmp_task, self._task) (self._task, tmp_task) = (tmp_task, self._task)
# FIXME: we should be sending back a callback result for each item in the loop here # FIXME: we should be sending back a callback result for each item in the loop here
@ -129,32 +128,24 @@ class TaskExecutor:
return results return results
def _execute(self): def _execute(self, variables=None):
''' '''
The primary workhorse of the executor system, this runs the task The primary workhorse of the executor system, this runs the task
on the specified host (which may be the delegated_to host) and handles on the specified host (which may be the delegated_to host) and handles
the retry/until and block rescue/always execution the retry/until and block rescue/always execution
''' '''
if variables is None:
variables = self._job_vars
self._connection = self._get_connection() self._connection = self._get_connection()
self._handler = self._get_action_handler(connection=self._connection) self._handler = self._get_action_handler(connection=self._connection)
# check to see if this task should be skipped, due to it being a member of a if not self._task.evaluate_conditional(variables):
# role which has already run (and whether that role allows duplicate execution)
if self._task._role and self._task._role.has_run():
# If there is no metadata, the default behavior is to not allow duplicates,
# if there is metadata, check to see if the allow_duplicates flag was set to true
if self._task._role._metadata is None or self._task._role._metadata and not self._task._role._metadata.allow_duplicates:
debug("task belongs to a role which has already run, but does not allow duplicate execution")
return dict(skipped=True, skip_reason='This role has already been run, but does not allow duplicates')
if not self._task.evaluate_conditional(self._job_vars):
debug("when evaulation failed, skipping this task") debug("when evaulation failed, skipping this task")
return dict(skipped=True, skip_reason='Conditional check failed') return dict(skipped=True, skip_reason='Conditional check failed')
if not self._task.evaluate_tags(self._connection_info.only_tags, self._connection_info.skip_tags): self._task.post_validate(variables)
debug("Tags don't match, skipping this task")
return dict(skipped=True, skip_reason='Skipped due to specified tags')
retries = self._task.retries retries = self._task.retries
if retries <= 0: if retries <= 0:
@ -173,7 +164,7 @@ class TaskExecutor:
result['attempts'] = attempt + 1 result['attempts'] = attempt + 1
debug("running the handler") debug("running the handler")
result = self._handler.run(task_vars=self._job_vars) result = self._handler.run(task_vars=variables)
debug("handler run complete") debug("handler run complete")
if self._task.async > 0: if self._task.async > 0:
@ -189,7 +180,7 @@ class TaskExecutor:
if self._task.until: if self._task.until:
# make a copy of the job vars here, in case we need to update them # make a copy of the job vars here, in case we need to update them
vars_copy = self._job_vars.copy() vars_copy = variables.copy()
# now update them with the registered value, if it is set # now update them with the registered value, if it is set
if self._task.register: if self._task.register:
vars_copy[self._task.register] = result vars_copy[self._task.register] = result

@ -179,7 +179,7 @@ class DataLoader():
basedir = os.path.dirname(role_path) basedir = os.path.dirname(role_path)
if os.path.islink(basedir): if os.path.islink(basedir):
# FIXME: # FIXME: implement unfrackpath
#basedir = unfrackpath(basedir) #basedir = unfrackpath(basedir)
template2 = os.path.join(basedir, dirname, source) template2 = os.path.join(basedir, dirname, source)
else: else:

@ -122,7 +122,10 @@ class Base:
return self return self
def get_ds(self): def get_ds(self):
return self._ds try:
return getattr(self, '_ds')
except AttributeError:
return None
def get_loader(self): def get_loader(self):
return self._loader return self._loader
@ -214,11 +217,10 @@ class Base:
setattr(self, name, value) setattr(self, name, value)
except (TypeError, ValueError), e: except (TypeError, ValueError), e:
#raise AnsibleParserError("the field '%s' has an invalid value, and could not be converted to an %s" % (name, attribute.isa), obj=self.get_ds()) raise AnsibleParserError("the field '%s' has an invalid value (%s), and could not be converted to an %s. Error was: %s" % (name, value, attribute.isa, e), obj=self.get_ds())
raise AnsibleParserError("the field '%s' has an invalid value (%s), and could not be converted to an %s. Error was: %s" % (name, value, attribute.isa, e)) except UndefinedError, e:
except UndefinedError:
if fail_on_undefined: if fail_on_undefined:
raise AnsibleParserError("the field '%s' has an invalid value, which appears to include a variable that is undefined" % (name,)) raise AnsibleParserError("the field '%s' has an invalid value, which appears to include a variable that is undefined. The error was: %s" % (name,e), obj=self.get_ds())
def serialize(self): def serialize(self):
''' '''

@ -153,14 +153,13 @@ class Block(Base, Conditional, Taggable):
return False return False
return super(Block, self).evaluate_conditional(all_vars) return super(Block, self).evaluate_conditional(all_vars)
def evaluate_tags(self, only_tags, skip_tags): def evaluate_tags(self, only_tags, skip_tags, all_vars):
result = False
if self._parent_block is not None: if self._parent_block is not None:
if not self._parent_block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags): result |= self._parent_block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return False
elif self._role is not None: elif self._role is not None:
if not self._role.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags): result |= self._role.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return False return result | super(Block, self).evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return super(Block, self).evaluate_tags(only_tags=only_tags, skip_tags=skip_tags)
def set_loader(self, loader): def set_loader(self, loader):
self._loader = loader self._loader = loader

@ -35,6 +35,7 @@ from ansible.playbook.helpers import load_list_of_blocks, compile_block_list
from ansible.playbook.role.include import RoleInclude from ansible.playbook.role.include import RoleInclude
from ansible.playbook.role.metadata import RoleMetadata from ansible.playbook.role.metadata import RoleMetadata
from ansible.playbook.taggable import Taggable from ansible.playbook.taggable import Taggable
from ansible.plugins import module_loader
from ansible.utils.vars import combine_vars from ansible.utils.vars import combine_vars
@ -127,6 +128,10 @@ class Role(Base, Conditional, Taggable):
#self._loader.set_basedir(self._role_path) #self._loader.set_basedir(self._role_path)
# load the role's files, if they exist # load the role's files, if they exist
library = os.path.join(self._role_path, 'library')
if os.path.isdir(library):
module_loader.add_directory(library)
metadata = self._load_role_yaml('meta') metadata = self._load_role_yaml('meta')
if metadata: if metadata:
self._metadata = RoleMetadata.load(metadata, owner=self, loader=self._loader) self._metadata = RoleMetadata.load(metadata, owner=self, loader=self._loader)

@ -19,7 +19,9 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
from ansible.errors import AnsibleError
from ansible.playbook.attribute import FieldAttribute from ansible.playbook.attribute import FieldAttribute
from ansible.template import Templar
class Taggable: class Taggable:
_tags = FieldAttribute(isa='list', default=[]) _tags = FieldAttribute(isa='list', default=[])
@ -27,20 +29,29 @@ class Taggable:
def __init__(self): def __init__(self):
super(Taggable, self).__init__() super(Taggable, self).__init__()
def get_tags(self): def _load_tags(self, attr, ds):
return self._tags[:] if isinstance(ds, list):
return ds
elif isinstance(ds, basestring):
return [ ds ]
else:
raise AnsibleError('tags must be specified as a list', obj=ds)
def evaluate_tags(self, only_tags, skip_tags): def evaluate_tags(self, only_tags, skip_tags, all_vars):
if self.tags: templar = Templar(loader=self._loader, variables=all_vars)
my_tags = set(self.tags) tags = templar.template(self.tags)
if not isinstance(tags, list):
tags = set([tags])
else: else:
my_tags = set() tags = set(tags)
#print("%s tags are: %s, only_tags=%s, skip_tags=%s" % (self, my_tags, only_tags, skip_tags))
if skip_tags: if skip_tags:
skipped_tags = my_tags.intersection(skip_tags) skipped_tags = tags.intersection(skip_tags)
if len(skipped_tags) > 0: if len(skipped_tags) > 0:
return False return False
matched_tags = my_tags.intersection(only_tags) matched_tags = tags.intersection(only_tags)
#print("matched tags are: %s" % matched_tags)
if len(matched_tags) > 0 or 'all' in only_tags: if len(matched_tags) > 0 or 'all' in only_tags:
return True return True
else: else:

@ -203,7 +203,12 @@ class Task(Base, Conditional, Taggable):
return listify_lookup_plugin_terms(value, all_vars, loader=self._loader) return listify_lookup_plugin_terms(value, all_vars, loader=self._loader)
def get_vars(self): def get_vars(self):
return self.serialize() all_vars = self.serialize()
if 'tags' in all_vars:
del all_vars['tags']
if 'when' in all_vars:
del all_vars['when']
return all_vars
def compile(self): def compile(self):
''' '''
@ -273,16 +278,14 @@ class Task(Base, Conditional, Taggable):
return False return False
return super(Task, self).evaluate_conditional(all_vars) return super(Task, self).evaluate_conditional(all_vars)
def evaluate_tags(self, only_tags, skip_tags): def evaluate_tags(self, only_tags, skip_tags, all_vars):
result = False
if len(self._dep_chain): if len(self._dep_chain):
for dep in self._dep_chain: for dep in self._dep_chain:
if not dep.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags): result |= dep.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return False
if self._block is not None: if self._block is not None:
if not self._block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags): result |= self._block.evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return False return result | super(Task, self).evaluate_tags(only_tags=only_tags, skip_tags=skip_tags, all_vars=all_vars)
return super(Task, self).evaluate_tags(only_tags=only_tags, skip_tags=skip_tags)
def set_loader(self, loader): def set_loader(self, loader):
''' '''

@ -397,7 +397,7 @@ class ActionBase:
debug("done with _execute_module (%s, %s)" % (module_name, module_args)) debug("done with _execute_module (%s, %s)" % (module_name, module_args))
return data return data
def _low_level_execute_command(self, cmd, tmp, executable=None, sudoable=False, in_data=None): def _low_level_execute_command(self, cmd, tmp, executable=None, sudoable=True, in_data=None):
''' '''
This is the function which executes the low level shell command, which This is the function which executes the low level shell command, which
may be commands to create/remove directories for temporary files, or to may be commands to create/remove directories for temporary files, or to
@ -413,8 +413,19 @@ class ActionBase:
if executable is None: if executable is None:
executable = C.DEFAULT_EXECUTABLE executable = C.DEFAULT_EXECUTABLE
prompt = None
success_key = None
if sudoable:
if self._connection_info.su and self._connection_info.su_user:
cmd, prompt, success_key = self._connection_info.make_su_cmd(executable, cmd)
elif self._connection_info.sudo and self._connection_info.sudo_user:
# FIXME: hard-coded sudo_exe here
cmd, prompt, success_key = self._connection_info.make_sudo_cmd('/usr/bin/sudo', executable, cmd)
debug("executing the command through the connection") debug("executing the command through the connection")
rc, stdin, stdout, stderr = self._connection.exec_command(cmd, tmp, executable=executable, in_data=in_data, sudoable=sudoable) #rc, stdin, stdout, stderr = self._connection.exec_command(cmd, tmp, executable=executable, in_data=in_data, sudoable=sudoable)
rc, stdin, stdout, stderr = self._connection.exec_command(cmd, tmp, executable=executable, in_data=in_data)
debug("command execution done") debug("command execution done")
if not isinstance(stdout, basestring): if not isinstance(stdout, basestring):

@ -24,8 +24,8 @@ import tempfile
import base64 import base64
import re import re
from ansible.plugins.action import ActionBase from ansible.plugins.action import ActionBase
from ansible.utils.boolean import boolean
from ansible.utils.hashing import checksum_s from ansible.utils.hashing import checksum_s
class ActionModule(ActionBase): class ActionModule(ActionBase):
@ -78,21 +78,16 @@ class ActionModule(ActionBase):
src = self._task.args.get('src', None) src = self._task.args.get('src', None)
dest = self._task.args.get('dest', None) dest = self._task.args.get('dest', None)
delimiter = self._task.args.get('delimiter', None) delimiter = self._task.args.get('delimiter', None)
# FIXME: boolean needs to be moved out of utils
#remote_src = utils.boolean(options.get('remote_src', 'yes'))
remote_src = self._task.args.get('remote_src', 'yes') remote_src = self._task.args.get('remote_src', 'yes')
regexp = self._task.args.get('regexp', None) regexp = self._task.args.get('regexp', None)
if src is None or dest is None: if src is None or dest is None:
return dict(failed=True, msg="src and dest are required") return dict(failed=True, msg="src and dest are required")
# FIXME: this should be boolean, hard-coded to yes for testing if boolean(remote_src):
if remote_src == 'yes':
return self._execute_module(tmp=tmp) return self._execute_module(tmp=tmp)
# FIXME: we don't do inject anymore, so not sure where the original elif self._task._role is not None:
# file stuff is going to end up at this time src = self._loader.path_dwim_relative(self._task._role._role_path, 'files', src)
#elif '_original_file' in inject:
# src = utils.path_dwim_relative(inject['_original_file'], 'files', src, self.runner.basedir)
else: else:
# the source is local, so expand it here # the source is local, so expand it here
src = os.path.expanduser(src) src = os.path.expanduser(src)

@ -111,16 +111,10 @@ class ActionModule(ActionBase):
# return ReturnData(conn=conn, result=results) # return ReturnData(conn=conn, result=results)
############################################################################################### ###############################################################################################
else: else:
# FIXME: templating needs to be worked out still if self._task._role is not None:
#source = template.template(self.runner.basedir, source, inject) source = self._loader.path_dwim_relative(self._task._role._role_path, 'files', source)
# FIXME: original_file stuff needs to be reworked - most likely else:
# simply checking to see if the task has a role and using source = self._loader.path_dwim(source)
# using the role path as the dwim target and basedir would work
#if '_original_file' in inject:
# source = utils.path_dwim_relative(inject['_original_file'], 'files', source, self.runner.basedir)
#else:
# source = utils.path_dwim(self.runner.basedir, source)
source = self._loader.path_dwim(source)
# A list of source file tuples (full_path, relative_path) which will try to copy to the destination # A list of source file tuples (full_path, relative_path) which will try to copy to the destination
source_files = [] source_files = []
@ -129,7 +123,7 @@ class ActionModule(ActionBase):
if os.path.isdir(source): if os.path.isdir(source):
# Get the amount of spaces to remove to get the relative path. # Get the amount of spaces to remove to get the relative path.
if source_trailing_slash: if source_trailing_slash:
sz = len(source) + 1 sz = len(source)
else: else:
sz = len(source.rsplit('/', 1)[0]) + 1 sz = len(source.rsplit('/', 1)[0]) + 1

@ -32,7 +32,7 @@ class ActionModule(ActionBase):
source = self._task.args.get('_raw_params') source = self._task.args.get('_raw_params')
if self._task._role: if self._task._role:
source = self._loader.path_dwim_relative(self._task._role.get('_role_path',''), 'vars', source) source = self._loader.path_dwim_relative(self._task._role._role_path, 'vars', source)
else: else:
source = self._loader.path_dwim(source) source = self._loader.path_dwim(source)

@ -45,7 +45,7 @@ class ActionModule(ActionBase):
# look up the files and use the first one we find as src # look up the files and use the first one we find as src
#if 'first_available_file' in task_vars: #if 'first_available_file' in task_vars:
# found = False # found = False
# for fn in self.runner.module_vars.get('first_available_file'): # for fn in task_vars.get('first_available_file'):
# fn_orig = fn # fn_orig = fn
# fnt = template.template(self.runner.basedir, fn, task_vars) # fnt = template.template(self.runner.basedir, fn, task_vars)
# fnd = utils.path_dwim(self.runner.basedir, fnt) # fnd = utils.path_dwim(self.runner.basedir, fnt)
@ -59,14 +59,13 @@ class ActionModule(ActionBase):
# result = dict(failed=True, msg="could not find src in first_available_file list") # result = dict(failed=True, msg="could not find src in first_available_file list")
# return ReturnData(conn=conn, comm_ok=False, result=result) # return ReturnData(conn=conn, comm_ok=False, result=result)
#else: #else:
# source = template.template(self.runner.basedir, source, task_vars) if 1:
# if self._task._role is not None:
# if '_original_file' in task_vars: source = self._loader.path_dwim_relative(self._task._role._role_path, 'templates', source)
# source = utils.path_dwim_relative(task_vars['_original_file'], 'templates', source, self.runner.basedir) else:
# else: source = self._loader.path_dwim(source)
# source = utils.path_dwim(self.runner.basedir, source)
################################################################################################## ##################################################################################################
source = self._loader.path_dwim(source) # END FIXME
################################################################################################## ##################################################################################################
# Expand any user home dir specification # Expand any user home dir specification

@ -39,12 +39,12 @@ class Connection(ConnectionBase):
''' connect to the local host; nothing to do here ''' ''' connect to the local host; nothing to do here '''
return self return self
def exec_command(self, cmd, tmp_path, sudo_user=None, sudoable=False, executable='/bin/sh', in_data=None, su=None, su_user=None): def exec_command(self, cmd, tmp_path, executable='/bin/sh', in_data=None):
''' run a command on the local host ''' ''' run a command on the local host '''
debug("in local.exec_command()") debug("in local.exec_command()")
# su requires to be run from a terminal, and therefore isn't supported here (yet?) # su requires to be run from a terminal, and therefore isn't supported here (yet?)
if su or su_user: if self._connection_info.su:
raise AnsibleError("Internal Error: this module does not support running commands via su") raise AnsibleError("Internal Error: this module does not support running commands via su")
if in_data: if in_data:

@ -87,6 +87,7 @@ class Connection(ConnectionBase):
if self._connection_info.port is not None: if self._connection_info.port is not None:
self._common_args += ["-o", "Port=%d" % (self._connection_info.port)] self._common_args += ["-o", "Port=%d" % (self._connection_info.port)]
# FIXME: need to get this from connection info
#if self.private_key_file is not None: #if self.private_key_file is not None:
# self._common_args += ["-o", "IdentityFile=\"%s\"" % os.path.expanduser(self.private_key_file)] # self._common_args += ["-o", "IdentityFile=\"%s\"" % os.path.expanduser(self.private_key_file)]
#elif self.runner.private_key_file is not None: #elif self.runner.private_key_file is not None:
@ -256,7 +257,7 @@ class Connection(ConnectionBase):
self._display.vvv("EXEC previous known host file not found for %s" % host) self._display.vvv("EXEC previous known host file not found for %s" % host)
return True return True
def exec_command(self, cmd, tmp_path, executable='/bin/sh', in_data=None, sudoable=False): def exec_command(self, cmd, tmp_path, executable='/bin/sh', in_data=None):
''' run a command on the remote host ''' ''' run a command on the remote host '''
ssh_cmd = self._password_cmd() ssh_cmd = self._password_cmd()
@ -266,15 +267,14 @@ class Connection(ConnectionBase):
# inside a tty automatically invokes the python interactive-mode but the modules are not # inside a tty automatically invokes the python interactive-mode but the modules are not
# compatible with the interactive-mode ("unexpected indent" mainly because of empty lines) # compatible with the interactive-mode ("unexpected indent" mainly because of empty lines)
ssh_cmd += ["-tt"] ssh_cmd += ["-tt"]
# FIXME: verbosity needs to move, most likely into connection info or if self._connection_info.verbosity > 3:
# whatever other context we pass around instead of runner objects ssh_cmd += ["-vvv"]
#if utils.VERBOSITY > 3: else:
# ssh_cmd += ["-vvv"] ssh_cmd += ["-q"]
#else:
# ssh_cmd += ["-q"]
ssh_cmd += ["-q"]
ssh_cmd += self._common_args ssh_cmd += self._common_args
# FIXME: ipv6 stuff needs to be figured out. It's in the connection info, however
# not sure if it's all working yet so this remains commented out
#if self._ipv6: #if self._ipv6:
# ssh_cmd += ['-6'] # ssh_cmd += ['-6']
ssh_cmd += [self._host.ipv4_address] ssh_cmd += [self._host.ipv4_address]
@ -436,6 +436,9 @@ class Connection(ConnectionBase):
# FIXME: make a function, used in all 3 methods EXEC/PUT/FETCH # FIXME: make a function, used in all 3 methods EXEC/PUT/FETCH
host = self._host.ipv4_address host = self._host.ipv4_address
# FIXME: ipv6 stuff needs to be figured out. It's in the connection info, however
# not sure if it's all working yet so this remains commented out
#if self._ipv6: #if self._ipv6:
# host = '[%s]' % host # host = '[%s]' % host
@ -463,6 +466,9 @@ class Connection(ConnectionBase):
# FIXME: make a function, used in all 3 methods EXEC/PUT/FETCH # FIXME: make a function, used in all 3 methods EXEC/PUT/FETCH
host = self._host.ipv4_address host = self._host.ipv4_address
# FIXME: ipv6 stuff needs to be figured out. It's in the connection info, however
# not sure if it's all working yet so this remains commented out
#if self._ipv6: #if self._ipv6:
# host = '[%s]' % self._host # host = '[%s]' % self._host

@ -60,12 +60,26 @@ class StrategyBase:
# outstanding tasks still in queue # outstanding tasks still in queue
self._blocked_hosts = dict() self._blocked_hosts = dict()
def run(self, iterator, connection_info): def run(self, iterator, connection_info, result=True):
# save the counts on failed/unreachable hosts, as the cleanup/handler
# methods will clear that information during their runs
num_failed = len(self._tqm._failed_hosts)
num_unreachable = len(self._tqm._unreachable_hosts)
debug("running the cleanup portion of the play") debug("running the cleanup portion of the play")
result = self.cleanup(iterator, connection_info) result &= self.cleanup(iterator, connection_info)
debug("running handlers") debug("running handlers")
result &= self.run_handlers(iterator, connection_info) result &= self.run_handlers(iterator, connection_info)
return result
if not result:
if num_unreachable > 0:
return 3
elif num_failed > 0:
return 2
else:
return 1
else:
return 0
def get_hosts_remaining(self, play): def get_hosts_remaining(self, play):
return [host for host in self._inventory.get_hosts(play.hosts) if host.name not in self._tqm._failed_hosts and host.get_name() not in self._tqm._unreachable_hosts] return [host for host in self._inventory.get_hosts(play.hosts) if host.name not in self._tqm._failed_hosts and host.get_name() not in self._tqm._unreachable_hosts]
@ -73,37 +87,10 @@ class StrategyBase:
def get_failed_hosts(self): def get_failed_hosts(self):
return [host for host in self._inventory.get_hosts() if host.name in self._tqm._failed_hosts] return [host for host in self._inventory.get_hosts() if host.name in self._tqm._failed_hosts]
def _queue_task(self, play, host, task, connection_info): def _queue_task(self, host, task, task_vars, connection_info):
''' handles queueing the task up to be sent to a worker ''' ''' handles queueing the task up to be sent to a worker '''
debug("entering _queue_task() for %s/%s/%s" % (play, host, task)) debug("entering _queue_task() for %s/%s" % (host, task))
# copy the task, to make sure we have a clean version, since the
# post-validation step will alter attribute values but this Task object
# is shared across all hosts in the play
debug("copying task")
new_task = task.copy()
debug("done copying task")
# squash variables down to a single dictionary using the variable manager and
# call post_validate() on the task, which will finalize the attribute values
debug("getting variables")
try:
task_vars = self._variable_manager.get_vars(loader=self._loader, play=play, host=host, task=new_task)
except EOFError:
# usually happens if the program is aborted, and the proxied object
# queue is cut off from the call, so we just ignore this and exit
return
debug("done getting variables")
debug("running post_validate() on the task")
if new_task.loop:
# if the task has a lookup loop specified, we do not error out
# on undefined variables yet, as fields may use {{item}} or some
# variant, which won't be defined until execution time
new_task.post_validate(task_vars, fail_on_undefined=False)
else:
new_task.post_validate(task_vars)
debug("done running post_validate() on the task")
# and then queue the new task # and then queue the new task
debug("%s - putting task (%s) in queue" % (host, task)) debug("%s - putting task (%s) in queue" % (host, task))
@ -116,12 +103,12 @@ class StrategyBase:
self._cur_worker = 0 self._cur_worker = 0
self._pending_results += 1 self._pending_results += 1
main_q.put((host, new_task, self._loader.get_basedir(), task_vars, connection_info), block=False) main_q.put((host, task, self._loader.get_basedir(), task_vars, connection_info), block=False)
except (EOFError, IOError, AssertionError), e: except (EOFError, IOError, AssertionError), e:
# most likely an abort # most likely an abort
debug("got an error while queuing: %s" % e) debug("got an error while queuing: %s" % e)
return return
debug("exiting _queue_task() for %s/%s/%s" % (play, host, task)) debug("exiting _queue_task() for %s/%s" % (host, task))
def _process_pending_results(self): def _process_pending_results(self):
''' '''
@ -140,7 +127,8 @@ class StrategyBase:
host = task_result._host host = task_result._host
task = task_result._task task = task_result._task
if result[0] == 'host_task_failed': if result[0] == 'host_task_failed':
self._tqm._failed_hosts[host.get_name()] = True if not task.ignore_errors:
self._tqm._failed_hosts[host.get_name()] = True
self._callback.runner_on_failed(task, task_result) self._callback.runner_on_failed(task, task_result)
elif result[0] == 'host_unreachable': elif result[0] == 'host_unreachable':
self._tqm._unreachable_hosts[host.get_name()] = True self._tqm._unreachable_hosts[host.get_name()] = True

@ -43,7 +43,7 @@ class StrategyModule(StrategyBase):
last_host = 0 last_host = 0
work_to_do = True work_to_do = True
while work_to_do: while work_to_do and not self._tqm._terminated:
hosts_left = self.get_hosts_remaining() hosts_left = self.get_hosts_remaining()
if len(hosts_left) == 0: if len(hosts_left) == 0:

@ -36,7 +36,7 @@ class StrategyModule(StrategyBase):
# iteratate over each task, while there is one left to run # iteratate over each task, while there is one left to run
work_to_do = True work_to_do = True
while work_to_do: while work_to_do and not self._tqm._terminated:
try: try:
debug("getting the remaining hosts for this loop") debug("getting the remaining hosts for this loop")
@ -52,7 +52,30 @@ class StrategyModule(StrategyBase):
callback_sent = False callback_sent = False
work_to_do = False work_to_do = False
for host in hosts_left: for host in hosts_left:
task = iterator.get_next_task_for_host(host) while True:
task = iterator.get_next_task_for_host(host)
if not task:
break
debug("getting variables")
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
debug("done getting variables")
# check to see if this task should be skipped, due to it being a member of a
# role which has already run (and whether that role allows duplicate execution)
if task._role and task._role.has_run():
# If there is no metadata, the default behavior is to not allow duplicates,
# if there is metadata, check to see if the allow_duplicates flag was set to true
if task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates:
debug("'%s' skipped because role has already run" % task)
continue
if not task.evaluate_tags(connection_info.only_tags, connection_info.skip_tags, task_vars):
debug("'%s' failed tag evaluation" % task)
continue
break
if not task: if not task:
continue continue
@ -61,24 +84,21 @@ class StrategyModule(StrategyBase):
self._callback.playbook_on_task_start(task.get_name(), False) self._callback.playbook_on_task_start(task.get_name(), False)
callback_sent = True callback_sent = True
host_name = host.get_name() self._blocked_hosts[host.get_name()] = True
if 1: #host_name not in self._tqm._failed_hosts and host_name not in self._tqm._unreachable_hosts: self._queue_task(host, task, task_vars, connection_info)
self._blocked_hosts[host_name] = True
self._queue_task(iterator._play, host, task, connection_info)
self._process_pending_results() self._process_pending_results()
debug("done queuing things up, now waiting for results queue to drain") debug("done queuing things up, now waiting for results queue to drain")
self._wait_on_pending_results() self._wait_on_pending_results()
debug("results queue empty") debug("results queue empty")
except IOError, e: except (IOError, EOFError), e:
debug("got IOError: %s" % e) debug("got IOError/EOFError in task loop: %s" % e)
# most likely an abort, return failed # most likely an abort, return failed
return 1 return 1
# run the base class run() method, which executes the cleanup function # run the base class run() method, which executes the cleanup function
# and runs any outstanding handlers which have been triggered # and runs any outstanding handlers which have been triggered
result &= super(StrategyModule, self).run(iterator, connection_info) return super(StrategyModule, self).run(iterator, connection_info, result)
return result

@ -25,7 +25,7 @@ from jinja2.utils import concat as j2_concat
from jinja2.runtime import StrictUndefined from jinja2.runtime import StrictUndefined
from ansible import constants as C from ansible import constants as C
from ansible.errors import * from ansible.errors import AnsibleError, AnsibleFilterError, AnsibleUndefinedVariable
from ansible.plugins import filter_loader, lookup_loader from ansible.plugins import filter_loader, lookup_loader
from ansible.template.safe_eval import safe_eval from ansible.template.safe_eval import safe_eval
from ansible.template.template import AnsibleJ2Template from ansible.template.template import AnsibleJ2Template
@ -266,7 +266,7 @@ class Templar:
res += '\n' * (data_newlines - res_newlines) res += '\n' * (data_newlines - res_newlines)
return res return res
except UndefinedError, AnsibleUndefinedVariable: except (UndefinedError, AnsibleUndefinedVariable), e:
if self._fail_on_undefined_errors: if self._fail_on_undefined_errors:
raise raise
else: else:

@ -144,6 +144,7 @@ class Cli(object):
dict(action=dict(module=options.module_name, args=parse_kv(options.module_args))), dict(action=dict(module=options.module_name, args=parse_kv(options.module_args))),
] ]
) )
play = Play().load(play_ds, variable_manager=variable_manager, loader=loader) play = Play().load(play_ds, variable_manager=variable_manager, loader=loader)
# now create a task queue manager to execute the play # now create a task queue manager to execute the play
@ -155,7 +156,7 @@ class Cli(object):
tqm.cleanup() tqm.cleanup()
raise raise
return (result, len(tqm._failed_hosts), len(tqm._unreachable_hosts)) return result
# ---------------------------------------------- # ----------------------------------------------
@ -179,12 +180,7 @@ if __name__ == '__main__':
try: try:
cli = Cli() cli = Cli()
(options, args) = cli.parse() (options, args) = cli.parse()
(result, num_failed, num_unreachable) = cli.run(options, args) result = cli.run(options, args)
if not result:
if num_failed > 0:
sys.exit(2)
elif num_unreachable > 0:
sys.exit(3)
except AnsibleError, e: except AnsibleError, e:
print(e) print(e)
@ -195,3 +191,4 @@ if __name__ == '__main__':
print("ERROR: %s" % str(e)) print("ERROR: %s" % str(e))
sys.exit(1) sys.exit(1)
sys.exit(result)

@ -143,7 +143,7 @@ def main(args):
# create the playbook executor, which manages running the plays # create the playbook executor, which manages running the plays
# via a task queue manager # via a task queue manager
pbex = PlaybookExecutor(playbooks=args, inventory=inventory, variable_manager=variable_manager, loader=loader, options=options) pbex = PlaybookExecutor(playbooks=args, inventory=inventory, variable_manager=variable_manager, loader=loader, options=options)
pbex.run() return pbex.run()
if __name__ == "__main__": if __name__ == "__main__":
#display(" ", log_only=True) #display(" ", log_only=True)

@ -0,0 +1,10 @@
- hosts: localhost
connection: local
gather_facts: no
tasks:
- fail:
ignore_errors: yes
- debug: msg="you should still see this"
- fail:
- debug: msg="you should NOT see this"
Loading…
Cancel
Save