|
|
@ -10,10 +10,10 @@ import os
|
|
|
|
import shutil
|
|
|
|
import shutil
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
import tempfile
|
|
|
|
import tempfile
|
|
|
|
import urllib2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from nose.plugins.skip import SkipTest
|
|
|
|
from nose.plugins.skip import SkipTest
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_binary(name):
|
|
|
|
def get_binary(name):
|
|
|
|
for directory in os.environ["PATH"].split(os.pathsep):
|
|
|
|
for directory in os.environ["PATH"].split(os.pathsep):
|
|
|
|
path = os.path.join(directory, name)
|
|
|
|
path = os.path.join(directory, name)
|
|
|
@ -21,6 +21,7 @@ def get_binary(name):
|
|
|
|
return path
|
|
|
|
return path
|
|
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRunner(unittest.TestCase):
|
|
|
|
class TestRunner(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
def setUp(self):
|
|
|
@ -110,70 +111,70 @@ class TestRunner(unittest.TestCase):
|
|
|
|
data_out = file(output).read()
|
|
|
|
data_out = file(output).read()
|
|
|
|
assert data_in == data_out
|
|
|
|
assert data_in == data_out
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed'] is True
|
|
|
|
assert 'md5sum' in result
|
|
|
|
assert 'md5sum' in result
|
|
|
|
result = self._run('copy', [
|
|
|
|
result = self._run('copy', [
|
|
|
|
"src=%s" % input_,
|
|
|
|
"src=%s" % input_,
|
|
|
|
"dest=%s" % output,
|
|
|
|
"dest=%s" % output,
|
|
|
|
])
|
|
|
|
])
|
|
|
|
assert result['changed'] == False
|
|
|
|
assert result['changed'] is False
|
|
|
|
|
|
|
|
|
|
|
|
def test_command(self):
|
|
|
|
def test_command(self):
|
|
|
|
# test command module, change trigger, etc
|
|
|
|
# test command module, change trigger, etc
|
|
|
|
result = self._run('command', [ "/bin/echo", "hi" ])
|
|
|
|
result = self._run('command', ["/bin/echo", "hi"])
|
|
|
|
assert "failed" not in result
|
|
|
|
assert "failed" not in result
|
|
|
|
assert "msg" not in result
|
|
|
|
assert "msg" not in result
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['stdout'] == 'hi'
|
|
|
|
assert result['stdout'] == 'hi'
|
|
|
|
assert result['stderr'] == ''
|
|
|
|
assert result['stderr'] == ''
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('command', [ "false" ])
|
|
|
|
result = self._run('command', ["false"])
|
|
|
|
assert result['rc'] == 1
|
|
|
|
assert result['rc'] == 1
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert 'failed' not in result
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ])
|
|
|
|
result = self._run('command', ["/usr/bin/this_does_not_exist", "splat"])
|
|
|
|
assert 'msg' in result
|
|
|
|
assert 'msg' in result
|
|
|
|
assert 'failed' in result
|
|
|
|
assert 'failed' in result
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('shell', [ "/bin/echo", "$HOME" ])
|
|
|
|
result = self._run('shell', ["/bin/echo", "$HOME"])
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['rc'] == 0
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('command', [ "creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'" ])
|
|
|
|
result = self._run('command', ["creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'"])
|
|
|
|
assert 'changed' in result
|
|
|
|
assert 'changed' in result
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['rc'] == 0
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('command', [ "creates='/tmp/ansible command test'", "false" ])
|
|
|
|
result = self._run('command', ["creates='/tmp/ansible command test'", "false"])
|
|
|
|
assert 'skipped' in result
|
|
|
|
assert 'skipped' in result
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?" ])
|
|
|
|
result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?"])
|
|
|
|
assert 'changed' in result
|
|
|
|
assert 'changed' in result
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['stdout'] == '0'
|
|
|
|
assert result['stdout'] == '0'
|
|
|
|
|
|
|
|
|
|
|
|
result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "false" ])
|
|
|
|
result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "false"])
|
|
|
|
assert 'skipped' in result
|
|
|
|
assert 'skipped' in result
|
|
|
|
|
|
|
|
|
|
|
|
def test_git(self):
|
|
|
|
def test_git(self):
|
|
|
|
self._run('file',['path=/tmp/gitdemo','state=absent'])
|
|
|
|
self._run('file', ['path=/tmp/gitdemo', 'state=absent'])
|
|
|
|
self._run('file',['path=/tmp/gd','state=absent'])
|
|
|
|
self._run('file', ['path=/tmp/gd', 'state=absent'])
|
|
|
|
self._run('command',['git init gitdemo', 'chdir=/tmp'])
|
|
|
|
self._run('command', ['git init gitdemo', 'chdir=/tmp'])
|
|
|
|
self._run('command',['touch a', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['touch a', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command',['git add *', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['git add *', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command',['touch b', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['touch b', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command',['git add *', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['git add *', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
|
|
|
|
self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo'])
|
|
|
|
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd" ])
|
|
|
|
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd"])
|
|
|
|
assert result['changed']
|
|
|
|
assert result['changed']
|
|
|
|
# test the force option not set
|
|
|
|
# test the force option not set
|
|
|
|
self._run('file',['path=/tmp/gd/a','state=absent'])
|
|
|
|
self._run('file', ['path=/tmp/gd/a', 'state=absent'])
|
|
|
|
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no" ])
|
|
|
|
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no"])
|
|
|
|
assert result['failed']
|
|
|
|
assert result['failed']
|
|
|
|
# test the force option when set
|
|
|
|
# test the force option when set
|
|
|
|
result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes" ])
|
|
|
|
result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes"])
|
|
|
|
assert result['changed']
|
|
|
|
assert result['changed']
|
|
|
|
|
|
|
|
|
|
|
|
def test_file(self):
|
|
|
|
def test_file(self):
|
|
|
|
filedemo = tempfile.mkstemp()[1]
|
|
|
|
filedemo = tempfile.mkstemp()[1]
|
|
|
|
assert self._run('file', ['dest=' + filedemo, 'state=directory'])['failed']
|
|
|
|
assert self._run('file', ['dest=' + filedemo, 'state=directory'])['failed']
|
|
|
@ -191,7 +192,6 @@ class TestRunner(unittest.TestCase):
|
|
|
|
assert not os.path.exists(filedemo)
|
|
|
|
assert not os.path.exists(filedemo)
|
|
|
|
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
|
|
|
|
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
filedemo = tempfile.mkdtemp()
|
|
|
|
filedemo = tempfile.mkdtemp()
|
|
|
|
assert self._run('file', ['dest=' + filedemo, 'state=file'])['failed']
|
|
|
|
assert self._run('file', ['dest=' + filedemo, 'state=file'])['failed']
|
|
|
|
assert os.path.isdir(filedemo)
|
|
|
|
assert os.path.isdir(filedemo)
|
|
|
@ -209,7 +209,6 @@ class TestRunner(unittest.TestCase):
|
|
|
|
assert not os.path.exists(filedemo)
|
|
|
|
assert not os.path.exists(filedemo)
|
|
|
|
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
|
|
|
|
assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
|
|
tmp_dir = tempfile.mkdtemp()
|
|
|
|
filedemo = os.path.join(tmp_dir, 'link')
|
|
|
|
filedemo = os.path.join(tmp_dir, 'link')
|
|
|
|
os.symlink('/dev/zero', filedemo)
|
|
|
|
os.symlink('/dev/zero', filedemo)
|
|
|
@ -234,7 +233,7 @@ class TestRunner(unittest.TestCase):
|
|
|
|
if not os.path.exists(large_path):
|
|
|
|
if not os.path.exists(large_path):
|
|
|
|
raise SkipTest
|
|
|
|
raise SkipTest
|
|
|
|
# Ensure reading a large amount of output from a command doesn't hang.
|
|
|
|
# Ensure reading a large amount of output from a command doesn't hang.
|
|
|
|
result = self._run('command', [ "/bin/cat", large_path ])
|
|
|
|
result = self._run('command', ["/bin/cat", large_path])
|
|
|
|
assert "failed" not in result
|
|
|
|
assert "failed" not in result
|
|
|
|
assert "msg" not in result
|
|
|
|
assert "msg" not in result
|
|
|
|
assert result['rc'] == 0
|
|
|
|
assert result['rc'] == 0
|
|
|
@ -244,14 +243,14 @@ class TestRunner(unittest.TestCase):
|
|
|
|
def test_async(self):
|
|
|
|
def test_async(self):
|
|
|
|
# test async launch and job status
|
|
|
|
# test async launch and job status
|
|
|
|
# of any particular module
|
|
|
|
# of any particular module
|
|
|
|
result = self._run('command', [ get_binary("sleep"), "3" ], background=20)
|
|
|
|
result = self._run('command', [get_binary("sleep"), "3"], background=20)
|
|
|
|
assert 'ansible_job_id' in result
|
|
|
|
assert 'ansible_job_id' in result
|
|
|
|
assert 'started' in result
|
|
|
|
assert 'started' in result
|
|
|
|
jid = result['ansible_job_id']
|
|
|
|
jid = result['ansible_job_id']
|
|
|
|
# no real chance of this op taking a while, but whatever
|
|
|
|
# no real chance of this op taking a while, but whatever
|
|
|
|
time.sleep(5)
|
|
|
|
time.sleep(5)
|
|
|
|
# CLI will abstract this (when polling), but this is how it works internally
|
|
|
|
# CLI will abstract this (when polling), but this is how it works internally
|
|
|
|
result = self._run('async_status', [ "jid=%s" % jid ])
|
|
|
|
result = self._run('async_status', ["jid=%s" % jid])
|
|
|
|
# TODO: would be nice to have tests for supervisory process
|
|
|
|
# TODO: would be nice to have tests for supervisory process
|
|
|
|
# killing job after X seconds
|
|
|
|
# killing job after X seconds
|
|
|
|
assert 'finished' in result
|
|
|
|
assert 'finished' in result
|
|
|
@ -263,7 +262,7 @@ class TestRunner(unittest.TestCase):
|
|
|
|
def test_fetch(self):
|
|
|
|
def test_fetch(self):
|
|
|
|
input_ = self._get_test_file('sample.j2')
|
|
|
|
input_ = self._get_test_file('sample.j2')
|
|
|
|
output = os.path.join(self.stage_dir, 'localhost', input_)
|
|
|
|
output = os.path.join(self.stage_dir, 'localhost', input_)
|
|
|
|
result = self._run('fetch', [ "src=%s" % input_, "dest=%s" % self.stage_dir ])
|
|
|
|
self._run('fetch', ["src=%s" % input_, "dest=%s" % self.stage_dir])
|
|
|
|
assert os.path.exists(output)
|
|
|
|
assert os.path.exists(output)
|
|
|
|
assert open(input_).read() == open(output).read()
|
|
|
|
assert open(input_).read() == open(output).read()
|
|
|
|
|
|
|
|
|
|
|
@ -280,7 +279,7 @@ class TestRunner(unittest.TestCase):
|
|
|
|
assert out.find("first") != -1
|
|
|
|
assert out.find("first") != -1
|
|
|
|
assert out.find("second") != -1
|
|
|
|
assert out.find("second") != -1
|
|
|
|
assert out.find("third") != -1
|
|
|
|
assert out.find("third") != -1
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed'] is True
|
|
|
|
assert 'md5sum' in result
|
|
|
|
assert 'md5sum' in result
|
|
|
|
assert 'failed' not in result
|
|
|
|
assert 'failed' not in result
|
|
|
|
result = self._run('assemble', [
|
|
|
|
result = self._run('assemble', [
|
|
|
@ -288,13 +287,14 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"dest=%s" % output,
|
|
|
|
"dest=%s" % output,
|
|
|
|
])
|
|
|
|
])
|
|
|
|
print result
|
|
|
|
print result
|
|
|
|
assert result['changed'] == False
|
|
|
|
assert result['changed'] is False
|
|
|
|
|
|
|
|
|
|
|
|
def test_lineinfile(self):
|
|
|
|
def test_lineinfile(self):
|
|
|
|
|
|
|
|
"""Unit tests for the lineinfile module, without backref features."""
|
|
|
|
sampleroot = 'rocannon'
|
|
|
|
sampleroot = 'rocannon'
|
|
|
|
sample_origin = self._get_test_file(sampleroot + '.txt')
|
|
|
|
sample_origin = self._get_test_file(sampleroot + '.txt')
|
|
|
|
sample = self._get_stage_file(sampleroot + '.out' + '.txt')
|
|
|
|
sample = self._get_stage_file(sampleroot + '.out' + '.txt')
|
|
|
|
shutil.copy( sample_origin, sample)
|
|
|
|
shutil.copy(sample_origin, sample)
|
|
|
|
# The order of the test cases is important
|
|
|
|
# The order of the test cases is important
|
|
|
|
|
|
|
|
|
|
|
|
# defaults to insertafter at the end of the file
|
|
|
|
# defaults to insertafter at the end of the file
|
|
|
@ -303,19 +303,19 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"regexp='^First: '",
|
|
|
|
"regexp='^First: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact[-1] == testline
|
|
|
|
assert artifact[-1] == testline
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
|
|
|
|
|
|
|
|
# run a second time, verify only one line has been added
|
|
|
|
# run a second time, verify only one line has been added
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == False
|
|
|
|
assert not result['changed']
|
|
|
|
assert result['msg'] == ''
|
|
|
|
assert result['msg'] == ''
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
|
|
|
|
|
|
|
|
# insertafter with EOF
|
|
|
|
# insertafter with EOF
|
|
|
@ -325,48 +325,48 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"insertafter=EOF",
|
|
|
|
"insertafter=EOF",
|
|
|
|
"regexp='^Second: '",
|
|
|
|
"regexp='^Second: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact[-1] == testline
|
|
|
|
assert artifact[-1] == testline
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
|
|
|
|
|
|
|
|
# with invalid insertafter regex
|
|
|
|
# with invalid insertafter regex
|
|
|
|
|
|
|
|
# If the regexp doesn't match and the insertafter doesn't match,
|
|
|
|
|
|
|
|
# do nothing.
|
|
|
|
testline = 'Third: Line added with an invalid insertafter regex'
|
|
|
|
testline = 'Third: Line added with an invalid insertafter regex'
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"insertafter='^abcdefgh'",
|
|
|
|
"insertafter='^abcdefgh'",
|
|
|
|
"regexp='^Third: '",
|
|
|
|
"regexp='^Third: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert not result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
|
|
|
|
assert artifact[-1] == testline
|
|
|
|
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# with an insertafter regex
|
|
|
|
# with an insertafter regex
|
|
|
|
|
|
|
|
# The regexp doesn't match, but the insertafter is specified and does,
|
|
|
|
|
|
|
|
# so insert after insertafter.
|
|
|
|
testline = 'Fourth: Line added with a valid insertafter regex'
|
|
|
|
testline = 'Fourth: Line added with a valid insertafter regex'
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"insertafter='^receive messages to '",
|
|
|
|
"insertafter='^receive messages to '",
|
|
|
|
"regexp='^Fourth: '",
|
|
|
|
"regexp='^Fourth: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
idx = artifact.index('receive messages to and from a corresponding device over any distance')
|
|
|
|
idx = artifact.index('receive messages to and from a corresponding device over any distance')
|
|
|
|
assert artifact[idx + 1] == testline
|
|
|
|
assert artifact[idx + 1] == testline
|
|
|
|
|
|
|
|
|
|
|
|
# replacement of a line from a regex
|
|
|
|
# replacement of a line from a regex
|
|
|
|
# we replace the line, so we need to get its idx before the run
|
|
|
|
# we replace the line, so we need to get its idx before the run
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
target_line = 'combination of microphone, speaker, keyboard and display. It can send and'
|
|
|
|
target_line = 'combination of microphone, speaker, keyboard and display. It can send and'
|
|
|
|
idx = artifact.index(target_line)
|
|
|
|
idx = artifact.index(target_line)
|
|
|
|
|
|
|
|
|
|
|
@ -375,18 +375,18 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"regexp='combination of microphone'",
|
|
|
|
"regexp='combination of microphone'",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line replaced'
|
|
|
|
assert result['msg'] == 'line replaced'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.index(testline) == idx
|
|
|
|
assert artifact.index(testline) == idx
|
|
|
|
assert target_line not in artifact
|
|
|
|
assert target_line not in artifact
|
|
|
|
|
|
|
|
|
|
|
|
# removal of a line
|
|
|
|
# removal of a line
|
|
|
|
# we replace the line, so we need to get its idx before the run
|
|
|
|
# we replace the line, so we need to get its idx before the run
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
target_line = 'receive messages to and from a corresponding device over any distance'
|
|
|
|
target_line = 'receive messages to and from a corresponding device over any distance'
|
|
|
|
idx = artifact.index(target_line)
|
|
|
|
idx = artifact.index(target_line)
|
|
|
|
|
|
|
|
|
|
|
@ -394,13 +394,12 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"dest=%s" % sample,
|
|
|
|
"regexp='^receive messages to and from '",
|
|
|
|
"regexp='^receive messages to and from '",
|
|
|
|
"state=absent"
|
|
|
|
"state=absent"
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert target_line not in artifact
|
|
|
|
assert target_line not in artifact
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# with both insertafter and insertbefore (should fail)
|
|
|
|
# with both insertafter and insertbefore (should fail)
|
|
|
|
testline = 'Seventh: this line should not be there'
|
|
|
|
testline = 'Seventh: this line should not be there'
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
testcase = ('lineinfile', [
|
|
|
@ -409,9 +408,9 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"insertbefore='BOF'",
|
|
|
|
"insertbefore='BOF'",
|
|
|
|
"regexp='^communication. '",
|
|
|
|
"regexp='^communication. '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['failed'] == True
|
|
|
|
assert result['failed']
|
|
|
|
|
|
|
|
|
|
|
|
# insertbefore with BOF
|
|
|
|
# insertbefore with BOF
|
|
|
|
testline = 'Eighth: insertbefore BOF'
|
|
|
|
testline = 'Eighth: insertbefore BOF'
|
|
|
@ -420,11 +419,11 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"insertbefore=BOF",
|
|
|
|
"insertbefore=BOF",
|
|
|
|
"regexp='^Eighth: '",
|
|
|
|
"regexp='^Eighth: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact[0] == testline
|
|
|
|
assert artifact[0] == testline
|
|
|
|
|
|
|
|
|
|
|
@ -435,11 +434,11 @@ class TestRunner(unittest.TestCase):
|
|
|
|
"insertbefore='^communication. Typically '",
|
|
|
|
"insertbefore='^communication. Typically '",
|
|
|
|
"regexp='^Ninth: '",
|
|
|
|
"regexp='^Ninth: '",
|
|
|
|
"line='%s'" % testline
|
|
|
|
"line='%s'" % testline
|
|
|
|
])
|
|
|
|
])
|
|
|
|
result = self._run(*testcase)
|
|
|
|
result = self._run(*testcase)
|
|
|
|
assert result['changed'] == True
|
|
|
|
assert result['changed']
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
assert result['msg'] == 'line added'
|
|
|
|
artifact = [ x.strip() for x in open(sample).readlines() ]
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
assert artifact.count(testline) == 1
|
|
|
|
idx = artifact.index('communication. Typically it is depicted as a lunch-box sized object with some')
|
|
|
|
idx = artifact.index('communication. Typically it is depicted as a lunch-box sized object with some')
|
|
|
|
assert artifact[idx - 1] == testline
|
|
|
|
assert artifact[idx - 1] == testline
|
|
|
@ -447,4 +446,139 @@ class TestRunner(unittest.TestCase):
|
|
|
|
# cleanup
|
|
|
|
# cleanup
|
|
|
|
os.unlink(sample)
|
|
|
|
os.unlink(sample)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_lineinfile_backrefs(self):
|
|
|
|
|
|
|
|
"""Unit tests for the lineinfile module, with backref features."""
|
|
|
|
|
|
|
|
sampleroot = 'rocannon'
|
|
|
|
|
|
|
|
sample_origin = self._get_test_file(sampleroot + '.txt')
|
|
|
|
|
|
|
|
origin_lines = [line.strip() for line in open(sample_origin)]
|
|
|
|
|
|
|
|
sample = self._get_stage_file(sampleroot + '.out' + '.txt')
|
|
|
|
|
|
|
|
shutil.copy(sample_origin, sample)
|
|
|
|
|
|
|
|
# The order of the test cases is important
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# The regexp doesn't match, so the line will not be added anywhere.
|
|
|
|
|
|
|
|
testline = r'\\1: Line added by default at the end of the file.'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"regexp='^(First): '",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert not result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == ''
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
assert artifact == origin_lines
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# insertafter with EOF
|
|
|
|
|
|
|
|
# The regexp doesn't match, so the line will not be added anywhere.
|
|
|
|
|
|
|
|
testline = r'\\1: Line added with insertafter=EOF'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"insertafter=EOF",
|
|
|
|
|
|
|
|
"regexp='^(Second): '",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert not result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == ''
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
assert artifact == origin_lines
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# with invalid insertafter regex
|
|
|
|
|
|
|
|
# The regexp doesn't match, so do nothing.
|
|
|
|
|
|
|
|
testline = r'\\1: Line added with an invalid insertafter regex'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"insertafter='^abcdefgh'",
|
|
|
|
|
|
|
|
"regexp='^(Third): '",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert not result['changed']
|
|
|
|
|
|
|
|
assert artifact == origin_lines
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# with an insertafter regex
|
|
|
|
|
|
|
|
# The regexp doesn't match, so do nothing.
|
|
|
|
|
|
|
|
testline = r'\\1: Line added with a valid insertafter regex'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"insertafter='^receive messages to '",
|
|
|
|
|
|
|
|
"regexp='^(Fourth): '",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert not result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == ''
|
|
|
|
|
|
|
|
assert artifact == origin_lines
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# replacement of a line from a regex
|
|
|
|
|
|
|
|
# we replace the line, so we need to get its idx before the run
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
target_line = 'combination of microphone, speaker, keyboard and display. It can send and'
|
|
|
|
|
|
|
|
idx = artifact.index(target_line)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
testline = r'\\1 of megaphone'
|
|
|
|
|
|
|
|
testline_after = 'combination of megaphone'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"regexp='(combination) of microphone'",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == 'line replaced'
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
assert artifact.count(testline_after) == 1
|
|
|
|
|
|
|
|
assert artifact.index(testline_after) == idx
|
|
|
|
|
|
|
|
assert target_line not in artifact
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Go again, should be unchanged now.
|
|
|
|
|
|
|
|
testline = r'\\1 of megaphone'
|
|
|
|
|
|
|
|
testline_after = 'combination of megaphone'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"regexp='(combination) of megaphone'",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert not result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == ''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Try a numeric, named capture group example.
|
|
|
|
|
|
|
|
f = open(sample, 'a+')
|
|
|
|
|
|
|
|
f.write("1 + 1 = 3" + os.linesep)
|
|
|
|
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
testline = r"2 + \\g<num> = 3"
|
|
|
|
|
|
|
|
testline_after = "2 + 1 = 3"
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
r"regexp='1 \\+ (?P<num>\\d) = 3'",
|
|
|
|
|
|
|
|
"line='%s'" % testline,
|
|
|
|
|
|
|
|
"backrefs=yes",
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
assert result['changed']
|
|
|
|
|
|
|
|
assert result['msg'] == 'line replaced'
|
|
|
|
|
|
|
|
artifact = [x.strip() for x in open(sample)]
|
|
|
|
|
|
|
|
assert '1 + 1 = 3' not in artifact
|
|
|
|
|
|
|
|
assert testline_after == artifact[-1]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# with both insertafter and insertbefore (should fail)
|
|
|
|
|
|
|
|
testline = 'Seventh: this line should not be there'
|
|
|
|
|
|
|
|
testcase = ('lineinfile', [
|
|
|
|
|
|
|
|
"dest=%s" % sample,
|
|
|
|
|
|
|
|
"insertafter='BOF'",
|
|
|
|
|
|
|
|
"insertbefore='BOF'",
|
|
|
|
|
|
|
|
"regexp='^communication. '",
|
|
|
|
|
|
|
|
"line='%s'" % testline
|
|
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
result = self._run(*testcase)
|
|
|
|
|
|
|
|
assert result['failed']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.unlink(sample)
|
|
|
|