diff --git a/library/lineinfile b/library/lineinfile index 2be9d83bceb..dab42e2a8ad 100644 --- a/library/lineinfile +++ b/library/lineinfile @@ -90,7 +90,7 @@ options: get the original file back if you somehow clobbered it incorrectly. others: description: - - All arguments accepted by the M(file) module also work here. + - All arguments accepted by the M(file) module also work here. required: false """ @@ -109,7 +109,7 @@ EXAMPLES = """ lineinfile dest=/tmp/grub.conf state=present regexp='^(splashimage=.*)$' line="#\\1" """ - + def check_file_attrs(module, changed, message): @@ -121,9 +121,11 @@ def check_file_attrs(module, changed, message): changed = True message += "ownership, perms or SE linux context changed" - return [ message, changed ] + return message, changed + -def present(module, dest, regexp, line, insertafter, insertbefore, create, backup): +def present(module, dest, regexp, line, insertafter, insertbefore, create, + backup, backrefs): if os.path.isdir(dest): module.fail_json(rc=256, msg='Destination %s is a directory !' % dest) @@ -142,10 +144,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu msg = "" mre = re.compile(regexp) - - if insertafter is not None and insertafter not in ('BOF', 'EOF'): + + if insertafter not in (None, 'BOF', 'EOF'): insre = re.compile(insertafter) - elif insertbefore is not None and insertbefore not in ('BOF'): + elif insertbefore not in (None, 'BOF'): insre = re.compile(insertbefore) else: insre = None @@ -154,12 +156,12 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu # index[1] is the line num where insertafter/inserbefore has been found index = [-1, -1] m = None - for lineno in range(0, len(lines)): - match_found = mre.search(lines[lineno]) + for lineno, cur_line in enumerate(lines): + match_found = mre.search(cur_line) if match_found: index[0] = lineno m = match_found - elif insre is not None and insre.search(lines[lineno]): + elif insre is not None and insre.search(cur_line): if insertafter: # + 1 for the next line index[1] = lineno + 1 @@ -167,15 +169,24 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu # + 1 for the previous line index[1] = lineno + msg = '' + changed = False # Regexp matched a line in the file if index[0] != -1: - if lines[index[0]] == m.expand(line) + os.linesep: - msg = '' - changed = False + if backrefs: + new_line = m.expand(line) else: - lines[index[0]] = m.expand(line) + os.linesep + # Don't do backref expansion if not asked. + new_line = line + + if lines[index[0]] != new_line + os.linesep: + lines[index[0]] = new_line + os.linesep msg = 'line replaced' changed = True + elif backrefs: + # Do absolutely nothing, since it's not safe generating the line + # without the regexp matching to populate the backrefs. + pass # Add it to the beginning of the file elif insertbefore == 'BOF' or insertafter == 'BOF': lines.insert(0, line + os.linesep) @@ -188,11 +199,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu lines.append(line + os.linesep) msg = 'line added' changed = True - # Do nothing if regexp didn't match + # Do nothing if insert* didn't match elif index[1] == -1: - msg = '' - changed = False - # insertafter/insertbefore= matched + pass + # insert* matched, but not the regexp else: lines.insert(index[1], line + os.linesep) msg = 'line added' @@ -205,9 +215,10 @@ def present(module, dest, regexp, line, insertafter, insertbefore, create, backu f.writelines(lines) f.close() - [ msg, changed ] = check_file_attrs(module, changed, msg) + msg, changed = check_file_attrs(module, changed, msg) module.exit_json(changed=changed, msg=msg) + def absent(module, dest, regexp, backup): if os.path.isdir(dest): @@ -242,36 +253,46 @@ def absent(module, dest, regexp, backup): if changed: msg = "%s line(s) removed" % len(found) - [ msg, changed ] = check_file_attrs(module, changed, msg) + msg, changed = check_file_attrs(module, changed, msg) module.exit_json(changed=changed, found=len(found), msg=msg) + def main(): module = AnsibleModule( - argument_spec = dict( + argument_spec=dict( dest=dict(required=True, aliases=['name', 'destfile']), state=dict(default='present', choices=['absent', 'present']), regexp=dict(required=True), line=dict(aliases=['value']), insertafter=dict(default=None), insertbefore=dict(default=None), + backrefs=dict(default=False, type='bool'), create=dict(default=False, type='bool'), backup=dict(default=False, type='bool'), ), - mutually_exclusive = [['insertbefore', 'insertafter']], - add_file_common_args = True, - supports_check_mode = True + mutually_exclusive=[['insertbefore', 'insertafter']], + add_file_common_args=True, + supports_check_mode=True ) params = module.params create = module.params['create'] backup = module.params['backup'] - dest = os.path.expanduser(params['dest']) + backrefs = module.params['backrefs'] + dest = os.path.expanduser(params['dest']) if params['state'] == 'present': if 'line' not in params: module.fail_json(msg='line= is required with state=present') + + # Deal with the insertafter default value manually, to avoid errors + # because of the mutually_exclusive mechanism. + ins_bef, ins_aft = params['insertbefore'], params['insertafter'] + if ins_bef is None and ins_aft is None: + ins_aft = 'EOF' + present(module, dest, params['regexp'], params['line'], - params['insertafter'], params['insertbefore'], create, backup) + ins_aft, ins_bef, create, backup, backrefs) else: absent(module, dest, params['regexp'], backup) @@ -279,4 +300,3 @@ def main(): #<> main() - diff --git a/test/TestRunner.py b/test/TestRunner.py index 65c780b2787..630f98922e7 100644 --- a/test/TestRunner.py +++ b/test/TestRunner.py @@ -10,10 +10,10 @@ import os import shutil import time import tempfile -import urllib2 from nose.plugins.skip import SkipTest + def get_binary(name): for directory in os.environ["PATH"].split(os.pathsep): path = os.path.join(directory, name) @@ -21,6 +21,7 @@ def get_binary(name): return path return None + class TestRunner(unittest.TestCase): def setUp(self): @@ -110,70 +111,70 @@ class TestRunner(unittest.TestCase): data_out = file(output).read() assert data_in == data_out assert 'failed' not in result - assert result['changed'] == True + assert result['changed'] is True assert 'md5sum' in result result = self._run('copy', [ "src=%s" % input_, "dest=%s" % output, ]) - assert result['changed'] == False + assert result['changed'] is False def test_command(self): # test command module, change trigger, etc - result = self._run('command', [ "/bin/echo", "hi" ]) + result = self._run('command', ["/bin/echo", "hi"]) assert "failed" not in result assert "msg" not in result assert result['rc'] == 0 assert result['stdout'] == 'hi' assert result['stderr'] == '' - result = self._run('command', [ "false" ]) + result = self._run('command', ["false"]) assert result['rc'] == 1 assert 'failed' not in result - result = self._run('command', [ "/usr/bin/this_does_not_exist", "splat" ]) + result = self._run('command', ["/usr/bin/this_does_not_exist", "splat"]) assert 'msg' in result assert 'failed' in result - result = self._run('shell', [ "/bin/echo", "$HOME" ]) + result = self._run('shell', ["/bin/echo", "$HOME"]) assert 'failed' not in result assert result['rc'] == 0 - result = self._run('command', [ "creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'" ]) + result = self._run('command', ["creates='/tmp/ansible command test'", "chdir=/tmp", "touch", "'ansible command test'"]) assert 'changed' in result assert result['rc'] == 0 - result = self._run('command', [ "creates='/tmp/ansible command test'", "false" ]) + result = self._run('command', ["creates='/tmp/ansible command test'", "false"]) assert 'skipped' in result - result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?" ]) + result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "chdir=/tmp", "rm -f 'ansible command test'; echo $?"]) assert 'changed' in result assert result['rc'] == 0 assert result['stdout'] == '0' - result = self._run('shell', [ "removes=/tmp/ansible\\ command\\ test", "false" ]) + result = self._run('shell', ["removes=/tmp/ansible\\ command\\ test", "false"]) assert 'skipped' in result def test_git(self): - self._run('file',['path=/tmp/gitdemo','state=absent']) - self._run('file',['path=/tmp/gd','state=absent']) - self._run('command',['git init gitdemo', 'chdir=/tmp']) - self._run('command',['touch a', 'chdir=/tmp/gitdemo']) - self._run('command',['git add *', 'chdir=/tmp/gitdemo']) - self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo']) - self._run('command',['touch b', 'chdir=/tmp/gitdemo']) - self._run('command',['git add *', 'chdir=/tmp/gitdemo']) - self._run('command',['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo']) - result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd" ]) + self._run('file', ['path=/tmp/gitdemo', 'state=absent']) + self._run('file', ['path=/tmp/gd', 'state=absent']) + self._run('command', ['git init gitdemo', 'chdir=/tmp']) + self._run('command', ['touch a', 'chdir=/tmp/gitdemo']) + self._run('command', ['git add *', 'chdir=/tmp/gitdemo']) + self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo']) + self._run('command', ['touch b', 'chdir=/tmp/gitdemo']) + self._run('command', ['git add *', 'chdir=/tmp/gitdemo']) + self._run('command', ['git commit -m "test commit 2"', 'chdir=/tmp/gitdemo']) + result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd"]) assert result['changed'] # test the force option not set - self._run('file',['path=/tmp/gd/a','state=absent']) - result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no" ]) + self._run('file', ['path=/tmp/gd/a', 'state=absent']) + result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=no"]) assert result['failed'] # test the force option when set - result = self._run('git', [ "repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes" ]) + result = self._run('git', ["repo=\"file:///tmp/gitdemo\"", "dest=/tmp/gd", "force=yes"]) assert result['changed'] - + def test_file(self): filedemo = tempfile.mkstemp()[1] assert self._run('file', ['dest=' + filedemo, 'state=directory'])['failed'] @@ -191,7 +192,6 @@ class TestRunner(unittest.TestCase): assert not os.path.exists(filedemo) assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed'] - filedemo = tempfile.mkdtemp() assert self._run('file', ['dest=' + filedemo, 'state=file'])['failed'] assert os.path.isdir(filedemo) @@ -209,7 +209,6 @@ class TestRunner(unittest.TestCase): assert not os.path.exists(filedemo) assert not self._run('file', ['dest=' + filedemo, 'state=absent'])['changed'] - tmp_dir = tempfile.mkdtemp() filedemo = os.path.join(tmp_dir, 'link') os.symlink('/dev/zero', filedemo) @@ -234,7 +233,7 @@ class TestRunner(unittest.TestCase): if not os.path.exists(large_path): raise SkipTest # Ensure reading a large amount of output from a command doesn't hang. - result = self._run('command', [ "/bin/cat", large_path ]) + result = self._run('command', ["/bin/cat", large_path]) assert "failed" not in result assert "msg" not in result assert result['rc'] == 0 @@ -244,14 +243,14 @@ class TestRunner(unittest.TestCase): def test_async(self): # test async launch and job status # of any particular module - result = self._run('command', [ get_binary("sleep"), "3" ], background=20) + result = self._run('command', [get_binary("sleep"), "3"], background=20) assert 'ansible_job_id' in result assert 'started' in result jid = result['ansible_job_id'] # no real chance of this op taking a while, but whatever time.sleep(5) # CLI will abstract this (when polling), but this is how it works internally - result = self._run('async_status', [ "jid=%s" % jid ]) + result = self._run('async_status', ["jid=%s" % jid]) # TODO: would be nice to have tests for supervisory process # killing job after X seconds assert 'finished' in result @@ -263,7 +262,7 @@ class TestRunner(unittest.TestCase): def test_fetch(self): input_ = self._get_test_file('sample.j2') output = os.path.join(self.stage_dir, 'localhost', input_) - result = self._run('fetch', [ "src=%s" % input_, "dest=%s" % self.stage_dir ]) + self._run('fetch', ["src=%s" % input_, "dest=%s" % self.stage_dir]) assert os.path.exists(output) assert open(input_).read() == open(output).read() @@ -280,7 +279,7 @@ class TestRunner(unittest.TestCase): assert out.find("first") != -1 assert out.find("second") != -1 assert out.find("third") != -1 - assert result['changed'] == True + assert result['changed'] is True assert 'md5sum' in result assert 'failed' not in result result = self._run('assemble', [ @@ -288,13 +287,14 @@ class TestRunner(unittest.TestCase): "dest=%s" % output, ]) print result - assert result['changed'] == False + assert result['changed'] is False def test_lineinfile(self): + """Unit tests for the lineinfile module, without backref features.""" sampleroot = 'rocannon' sample_origin = self._get_test_file(sampleroot + '.txt') sample = self._get_stage_file(sampleroot + '.out' + '.txt') - shutil.copy( sample_origin, sample) + shutil.copy(sample_origin, sample) # The order of the test cases is important # defaults to insertafter at the end of the file @@ -303,19 +303,19 @@ class TestRunner(unittest.TestCase): "dest=%s" % sample, "regexp='^First: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact[-1] == testline assert artifact.count(testline) == 1 # run a second time, verify only one line has been added result = self._run(*testcase) - assert result['changed'] == False + assert not result['changed'] assert result['msg'] == '' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact.count(testline) == 1 # insertafter with EOF @@ -325,48 +325,48 @@ class TestRunner(unittest.TestCase): "insertafter=EOF", "regexp='^Second: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact[-1] == testline assert artifact.count(testline) == 1 # with invalid insertafter regex + # If the regexp doesn't match and the insertafter doesn't match, + # do nothing. testline = 'Third: Line added with an invalid insertafter regex' testcase = ('lineinfile', [ "dest=%s" % sample, "insertafter='^abcdefgh'", "regexp='^Third: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True - assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] - assert artifact[-1] == testline - assert artifact.count(testline) == 1 + assert not result['changed'] # with an insertafter regex + # The regexp doesn't match, but the insertafter is specified and does, + # so insert after insertafter. testline = 'Fourth: Line added with a valid insertafter regex' testcase = ('lineinfile', [ "dest=%s" % sample, "insertafter='^receive messages to '", "regexp='^Fourth: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact.count(testline) == 1 idx = artifact.index('receive messages to and from a corresponding device over any distance') assert artifact[idx + 1] == testline # replacement of a line from a regex # we replace the line, so we need to get its idx before the run - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] target_line = 'combination of microphone, speaker, keyboard and display. It can send and' idx = artifact.index(target_line) @@ -375,18 +375,18 @@ class TestRunner(unittest.TestCase): "dest=%s" % sample, "regexp='combination of microphone'", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line replaced' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact.count(testline) == 1 assert artifact.index(testline) == idx assert target_line not in artifact # removal of a line # we replace the line, so we need to get its idx before the run - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] target_line = 'receive messages to and from a corresponding device over any distance' idx = artifact.index(target_line) @@ -394,13 +394,12 @@ class TestRunner(unittest.TestCase): "dest=%s" % sample, "regexp='^receive messages to and from '", "state=absent" - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True - artifact = [ x.strip() for x in open(sample).readlines() ] + assert result['changed'] + artifact = [x.strip() for x in open(sample)] assert target_line not in artifact - # with both insertafter and insertbefore (should fail) testline = 'Seventh: this line should not be there' testcase = ('lineinfile', [ @@ -409,9 +408,9 @@ class TestRunner(unittest.TestCase): "insertbefore='BOF'", "regexp='^communication. '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['failed'] == True + assert result['failed'] # insertbefore with BOF testline = 'Eighth: insertbefore BOF' @@ -420,11 +419,11 @@ class TestRunner(unittest.TestCase): "insertbefore=BOF", "regexp='^Eighth: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact.count(testline) == 1 assert artifact[0] == testline @@ -435,11 +434,11 @@ class TestRunner(unittest.TestCase): "insertbefore='^communication. Typically '", "regexp='^Ninth: '", "line='%s'" % testline - ]) + ]) result = self._run(*testcase) - assert result['changed'] == True + assert result['changed'] assert result['msg'] == 'line added' - artifact = [ x.strip() for x in open(sample).readlines() ] + artifact = [x.strip() for x in open(sample)] assert artifact.count(testline) == 1 idx = artifact.index('communication. Typically it is depicted as a lunch-box sized object with some') assert artifact[idx - 1] == testline @@ -447,4 +446,106 @@ class TestRunner(unittest.TestCase): # cleanup os.unlink(sample) + def test_lineinfile_backrefs(self): + """Unit tests for the lineinfile module, with backref features.""" + sampleroot = 'rocannon' + sample_origin = self._get_test_file(sampleroot + '.txt') + origin_lines = [line.strip() for line in open(sample_origin)] + sample = self._get_stage_file(sampleroot + '.out' + '.txt') + shutil.copy(sample_origin, sample) + # The order of the test cases is important + + # The regexp doesn't match, so the line will not be added anywhere. + testline = '\\1: Line added by default at the end of the file.' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "regexp='^(First): '", + "line='%s'" % testline, + "backrefs=yes", + ]) + result = self._run(*testcase) + assert not result['changed'] + assert result['msg'] == '' + artifact = [x.strip() for x in open(sample)] + assert artifact == origin_lines + + # insertafter with EOF + # The regexp doesn't match, so the line will not be added anywhere. + testline = '\\1: Line added with insertafter=EOF' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "insertafter=EOF", + "regexp='^(Second): '", + "line='%s'" % testline, + "backrefs=yes", + ]) + result = self._run(*testcase) + assert not result['changed'] + assert result['msg'] == '' + artifact = [x.strip() for x in open(sample)] + assert artifact == origin_lines + + # with invalid insertafter regex + # The regexp doesn't match, so do nothing. + testline = '\\1: Line added with an invalid insertafter regex' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "insertafter='^abcdefgh'", + "regexp='^(Third): '", + "line='%s'" % testline, + "backrefs=yes", + ]) + result = self._run(*testcase) + assert not result['changed'] + assert artifact == origin_lines + + # with an insertafter regex + # The regexp doesn't match, so do nothing. + testline = '\\1: Line added with a valid insertafter regex' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "insertafter='^receive messages to '", + "regexp='^(Fourth): '", + "line='%s'" % testline, + "backrefs=yes", + ]) + result = self._run(*testcase) + assert not result['changed'] + assert result['msg'] == '' + assert artifact == origin_lines + + # replacement of a line from a regex + # we replace the line, so we need to get its idx before the run + artifact = [x.strip() for x in open(sample)] + target_line = 'combination of microphone, speaker, keyboard and display. It can send and' + idx = artifact.index(target_line) + testline = '\\\\1 of microphone' + testline_after = 'combination of microphone' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "regexp='(combination) of microphone'", + "line='%s'" % testline, + "backrefs=yes", + ]) + result = self._run(*testcase) + assert result['changed'] + assert result['msg'] == 'line replaced' + artifact = [x.strip() for x in open(sample)] + assert artifact.count(testline_after) == 1 + assert artifact.index(testline_after) == idx + assert target_line not in artifact + + # with both insertafter and insertbefore (should fail) + testline = 'Seventh: this line should not be there' + testcase = ('lineinfile', [ + "dest=%s" % sample, + "insertafter='BOF'", + "insertbefore='BOF'", + "regexp='^communication. '", + "line='%s'" % testline + ]) + result = self._run(*testcase) + assert result['failed'] + + os.unlink(sample)