You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/units/plugins/action/test_synchronize.py

265 lines
8.7 KiB
Python

'''
(Epdb) pprint(DeepDiff(self.final_task_vars, out_task_vars), indent=2)
{ 'dic_item_added': set([u"root['ansible_python_interpreter']"]),
'dic_item_removed': set([ u"root['hostvars']['127.0.0.1']",
u"root['hostvars']['::1']",
u"root['hostvars']['localhost']"]),
'iterable_item_added': { u"root['hostvars']['el6host']['groups']['all'][1]": u'::1',
u"root['hostvars']['el6host']['groups']['ungrouped'][1]": u'::1',
u"root['vars']['hostvars']['el6host']['groups']['all'][1]": u'::1',
u"root['vars']['hostvars']['el6host']['groups']['ungrouped'][1]": u'::1'}}
'''
import json
import os
import unittest
import yaml
import ansible.plugins
from units.compat.mock import patch, MagicMock
from ansible.plugins.action.synchronize import ActionModule
# Getting the incoming and outgoing task vars from the plugin's run method
'''
import copy
safe_vars = {}
for k,v in task_vars.items():
if k not in ['vars', 'hostvars']:
safe_vars[k] = copy.deepcopy(v)
else:
sdata = str(v)
newv = eval(sdata)
safe_vars[k] = newv
import json
with open('task_vars.json', 'wb') as f:
f.write(json.dumps(safe_vars, indent=2))
'''
class BreakPoint(Exception):
pass
class TaskMock(object):
args = {'src': u'/tmp/deleteme',
'dest': '/tmp/deleteme',
'rsync_path': 'rsync'}
async_val = None
become = None
become_user = None
become_method = None
class StdinMock(object):
shell = None
class ConnectionMock(object):
ismock = True
_play_context = None
# transport = 'ssh'
transport = None
_new_stdin = StdinMock()
get_option = MagicMock(return_value='root')
Temporary (#31677) * allow shells to have per host options, remote_tmp added language to shell removed module lang setting from general as plugins have it now use get to avoid bad powershell plugin more resilient tmp discovery, fall back to `pwd` add shell to docs fixed options for when frags are only options added shell set ops in t_e and fixed option frags normalize tmp dir usag4e - pass tmpdir/tmp/temp options as env var to commands, making it default for tempfile - adjusted ansiballz tmpdir - default local tempfile usage to the configured local tmp - set env temp in action add options to powershell shift temporary to internal envvar/params ensure tempdir is set if we pass var ensure basic and url use expected tempdir ensure localhost uses local tmp give /var/tmp priority, less perms issues more consistent tempfile mgmt for ansiballz made async_dir configurable better action handling, allow for finally rm tmp fixed tmp issue and no more tempdir in ballz hostvarize world readable and admin users always set shell tempdir added comment to discourage use of exception/flow control * Mostly revert expand_user as it's not quite working. This was an additional feature anyhow. Kept the use of pwd as a fallback but moved it to a second ssh connection. This is not optimal but getting that to work in a single ssh connection was part of the problem holding this up. (cherry picked from commit 395b714120522f15e4c90a346f5e8e8d79213aca) * fixed script and other action plugins ensure tmpdir deletion allow for connections that don't support new options (legacy, 3rd party) fixed tests
7 years ago
# my shell
_shell = MagicMock()
_shell.mkdtemp.return_value = 'mkdir command'
_shell.join_path.side_effect = os.path.join
_shell.get_option = MagicMock(return_value=['root', 'toor'])
Temporary (#31677) * allow shells to have per host options, remote_tmp added language to shell removed module lang setting from general as plugins have it now use get to avoid bad powershell plugin more resilient tmp discovery, fall back to `pwd` add shell to docs fixed options for when frags are only options added shell set ops in t_e and fixed option frags normalize tmp dir usag4e - pass tmpdir/tmp/temp options as env var to commands, making it default for tempfile - adjusted ansiballz tmpdir - default local tempfile usage to the configured local tmp - set env temp in action add options to powershell shift temporary to internal envvar/params ensure tempdir is set if we pass var ensure basic and url use expected tempdir ensure localhost uses local tmp give /var/tmp priority, less perms issues more consistent tempfile mgmt for ansiballz made async_dir configurable better action handling, allow for finally rm tmp fixed tmp issue and no more tempdir in ballz hostvarize world readable and admin users always set shell tempdir added comment to discourage use of exception/flow control * Mostly revert expand_user as it's not quite working. This was an additional feature anyhow. Kept the use of pwd as a fallback but moved it to a second ssh connection. This is not optimal but getting that to work in a single ssh connection was part of the problem holding this up. (cherry picked from commit 395b714120522f15e4c90a346f5e8e8d79213aca) * fixed script and other action plugins ensure tmpdir deletion allow for connections that don't support new options (legacy, 3rd party) fixed tests
7 years ago
class PlayContextMock(object):
shell = None
private_key_file = None
become = False
become_user = 'root'
become_method = None
check_mode = False
no_log = None
diff = None
remote_addr = None
remote_user = None
password = None
class ModuleLoaderMock(object):
def find_plugin(self, module_name, mod_type):
pass
class SharedLoaderMock(object):
module_loader = ModuleLoaderMock()
class SynchronizeTester(object):
''' A wrapper for mocking out synchronize environments '''
task = TaskMock()
connection = ConnectionMock()
_play_context = PlayContextMock()
loader = None
templar = None
shared_loader_obj = SharedLoaderMock()
final_task_vars = None
execute_called = False
def _execute_module(self, module_name, module_args=None, task_vars=None):
self.execute_called = True
self.final_module_args = module_args
self.final_task_vars = task_vars
return {}
def runtest(self, fixturepath='fixtures/synchronize/basic'):
metapath = os.path.join(fixturepath, 'meta.yaml')
with open(metapath, 'rb') as f:
fdata = f.read()
test_meta = yaml.load(fdata)
# load initial play context vars
if '_play_context' in test_meta:
if test_meta['_play_context']:
self.task.args = {}
for (k, v) in test_meta['_play_context'].items():
if v == 'None':
v = None
setattr(self._play_context, k, v)
# load initial task context vars
if '_task' in test_meta:
if test_meta['_task']:
self.task.args = {}
for (k, v) in test_meta['_task'].items():
# import epdb; epdb.st()
if v == 'None':
v = None
setattr(self.task, k, v)
# load initial task vars
if 'task_args' in test_meta:
if test_meta['task_args']:
self.task.args = {}
for (k, v) in test_meta['task_args'].items():
self.task.args[k] = v
# load initial task vars
invarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_in', 'taskvars_in.json'))
with open(invarspath, 'rb') as f:
fdata = f.read()
fdata = fdata.decode("utf-8")
in_task_vars = json.loads(fdata)
# load expected final task vars
outvarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_out', 'taskvars_out.json'))
with open(outvarspath, 'rb') as f:
fdata = f.read()
fdata = fdata.decode("utf-8")
out_task_vars = json.loads(fdata)
# fixup the connection
for (k, v) in test_meta['connection'].items():
setattr(self.connection, k, v)
# fixup the hostvars
if test_meta['hostvars']:
for (k, v) in test_meta['hostvars'].items():
in_task_vars['hostvars'][k] = v
# initialize and run the module
SAM = ActionModule(self.task, self.connection, self._play_context,
self.loader, self.templar, self.shared_loader_obj)
SAM._execute_module = self._execute_module
result = SAM.run(task_vars=in_task_vars)
# run assertions
for check in test_meta['asserts']:
value = eval(check)
# if not value:
# print(check, value)
# import epdb; epdb.st()
assert value, check
class FakePluginLoader(object):
mocked = True
@staticmethod
def get(transport, play_context, new_stdin):
conn = ConnectionMock()
conn.transport = transport
conn._play_context = play_context
conn._new_stdin = new_stdin
return conn
class TestSynchronizeAction(unittest.TestCase):
fixturedir = os.path.dirname(__file__)
fixturedir = os.path.join(fixturedir, 'fixtures', 'synchronize')
# print(basedir)
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic(self):
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_become(self):
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_become_cli(self):
# --become on the cli sets _play_context.become
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become_cli'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant(self):
# simple vagrant example
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant_sudo(self):
# vagrant plus sudo
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_sudo'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_basic_vagrant_become_cli(self):
# vagrant plus sudo
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_become_cli'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_delegate_remote(self):
# delegate to other remote host
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote'))
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
def test_delegate_remote_su(self):
# delegate to other remote host with su enabled
x = SynchronizeTester()
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote_su'))
@patch.object(ActionModule, '_low_level_execute_command', side_effect=BreakPoint)
@patch.object(ActionModule, '_remote_expand_user', side_effect=ActionModule._remote_expand_user, autospec=True)
def test_remote_user_not_in_local_tmpdir(self, spy_remote_expand_user, ll_ec):
x = SynchronizeTester()
SAM = ActionModule(x.task, x.connection, x._play_context,
x.loader, x.templar, x.shared_loader_obj)
try:
SAM.run(task_vars={'hostvars': {'foo': {}, 'localhost': {}}, 'inventory_hostname': 'foo'})
except BreakPoint:
pass
self.assertEqual(spy_remote_expand_user.call_count, 0)