Properly quote all needed components of shell commands (#83365)

* Properly quote all needed components of shell commands

* Use self.quote, add new self.join
pull/83404/head
Matt Martz 6 months ago committed by GitHub
parent 68638f4710
commit 93b8b86067
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,2 @@
bugfixes:
- shell plugin - properly quote all needed components of shell commands (https://github.com/ansible/ansible/issues/82535)

@ -85,7 +85,7 @@ class ShellBase(AnsiblePlugin):
return 'ansible-tmp-%s-%s-%s' % (time.time(), os.getpid(), random.randint(0, 2**48)) return 'ansible-tmp-%s-%s-%s' % (time.time(), os.getpid(), random.randint(0, 2**48))
def env_prefix(self, **kwargs): def env_prefix(self, **kwargs):
return ' '.join(['%s=%s' % (k, shlex.quote(text_type(v))) for k, v in kwargs.items()]) return ' '.join(['%s=%s' % (k, self.quote(text_type(v))) for k, v in kwargs.items()])
def join_path(self, *args): def join_path(self, *args):
return os.path.join(*args) return os.path.join(*args)
@ -101,41 +101,33 @@ class ShellBase(AnsiblePlugin):
def chmod(self, paths, mode): def chmod(self, paths, mode):
cmd = ['chmod', mode] cmd = ['chmod', mode]
cmd.extend(paths) cmd.extend(paths)
cmd = [shlex.quote(c) for c in cmd] return self.join(cmd)
return ' '.join(cmd)
def chown(self, paths, user): def chown(self, paths, user):
cmd = ['chown', user] cmd = ['chown', user]
cmd.extend(paths) cmd.extend(paths)
cmd = [shlex.quote(c) for c in cmd] return self.join(cmd)
return ' '.join(cmd)
def chgrp(self, paths, group): def chgrp(self, paths, group):
cmd = ['chgrp', group] cmd = ['chgrp', group]
cmd.extend(paths) cmd.extend(paths)
cmd = [shlex.quote(c) for c in cmd] return self.join(cmd)
return ' '.join(cmd)
def set_user_facl(self, paths, user, mode): def set_user_facl(self, paths, user, mode):
"""Only sets acls for users as that's really all we need""" """Only sets acls for users as that's really all we need"""
cmd = ['setfacl', '-m', 'u:%s:%s' % (user, mode)] cmd = ['setfacl', '-m', 'u:%s:%s' % (user, mode)]
cmd.extend(paths) cmd.extend(paths)
cmd = [shlex.quote(c) for c in cmd] return self.join(cmd)
return ' '.join(cmd)
def remove(self, path, recurse=False): def remove(self, path, recurse=False):
path = shlex.quote(path) path = self.quote(path)
cmd = 'rm -f ' cmd = 'rm -f '
if recurse: if recurse:
cmd += '-r ' cmd += '-r '
return cmd + "%s %s" % (path, self._SHELL_REDIRECT_ALLNULL) return cmd + "%s %s" % (path, self._SHELL_REDIRECT_ALLNULL)
def exists(self, path): def exists(self, path):
cmd = ['test', '-e', shlex.quote(path)] cmd = ['test', '-e', self.quote(path)]
return ' '.join(cmd) return ' '.join(cmd)
def mkdtemp(self, basefile=None, system=False, mode=0o700, tmpdir=None): def mkdtemp(self, basefile=None, system=False, mode=0o700, tmpdir=None):
@ -194,8 +186,7 @@ class ShellBase(AnsiblePlugin):
# Check that the user_path to expand is safe # Check that the user_path to expand is safe
if user_home_path != '~': if user_home_path != '~':
if not _USER_HOME_PATH_RE.match(user_home_path): if not _USER_HOME_PATH_RE.match(user_home_path):
# shlex.quote will make the shell return the string verbatim user_home_path = self.quote(user_home_path)
user_home_path = shlex.quote(user_home_path)
elif username: elif username:
# if present the user name is appended to resolve "that user's home" # if present the user name is appended to resolve "that user's home"
user_home_path += username user_home_path += username
@ -207,20 +198,20 @@ class ShellBase(AnsiblePlugin):
return 'echo %spwd%s' % (self._SHELL_SUB_LEFT, self._SHELL_SUB_RIGHT) return 'echo %spwd%s' % (self._SHELL_SUB_LEFT, self._SHELL_SUB_RIGHT)
def build_module_command(self, env_string, shebang, cmd, arg_path=None): def build_module_command(self, env_string, shebang, cmd, arg_path=None):
# don't quote the cmd if it's an empty string, because this will break pipelining mode env_string = env_string.strip()
if cmd.strip() != '': if env_string:
cmd = shlex.quote(cmd) env_string += ' '
cmd_parts = [] if shebang is None:
if shebang: shebang = ''
shebang = shebang.replace("#!", "").strip()
else: cmd_parts = [
shebang = "" shebang.removeprefix('#!').strip(),
cmd_parts.extend([env_string.strip(), shebang, cmd]) cmd.strip(),
if arg_path is not None: arg_path,
cmd_parts.append(arg_path) ]
new_cmd = " ".join(cmd_parts)
return new_cmd return f'{env_string}%s' % self.join(cps for cp in cmd_parts if cp and (cps := cp.strip()))
def append_command(self, cmd, cmd_to_append): def append_command(self, cmd, cmd_to_append):
"""Append an additional command if supported by the shell""" """Append an additional command if supported by the shell"""
@ -237,3 +228,7 @@ class ShellBase(AnsiblePlugin):
def quote(self, cmd): def quote(self, cmd):
"""Returns a shell-escaped string that can be safely used as one token in a shell command line""" """Returns a shell-escaped string that can be safely used as one token in a shell command line"""
return shlex.quote(cmd) return shlex.quote(cmd)
def join(self, cmd_parts):
"""Returns a shell-escaped string from a list that can be safely used in a shell command line"""
return shlex.join(cmd_parts)

@ -13,8 +13,6 @@ extends_documentation_fragment:
- shell_common - shell_common
''' '''
import shlex
from ansible.plugins.shell import ShellBase from ansible.plugins.shell import ShellBase
@ -66,7 +64,7 @@ class ShellModule(ShellBase):
# Quoting gets complex here. We're writing a python string that's # Quoting gets complex here. We're writing a python string that's
# used by a variety of shells on the remote host to invoke a python # used by a variety of shells on the remote host to invoke a python
# "one-liner". # "one-liner".
shell_escaped_path = shlex.quote(path) shell_escaped_path = self.quote(path)
test = "rc=flag; [ -r %(p)s ] %(shell_or)s rc=2; [ -f %(p)s ] %(shell_or)s rc=1; [ -d %(p)s ] %(shell_and)s rc=3; %(i)s -V 2>/dev/null %(shell_or)s rc=4; [ x\"$rc\" != \"xflag\" ] %(shell_and)s echo \"${rc} \"%(p)s %(shell_and)s exit 0" % dict(p=shell_escaped_path, i=python_interp, shell_and=self._SHELL_AND, shell_or=self._SHELL_OR) # NOQA test = "rc=flag; [ -r %(p)s ] %(shell_or)s rc=2; [ -f %(p)s ] %(shell_or)s rc=1; [ -d %(p)s ] %(shell_and)s rc=3; %(i)s -V 2>/dev/null %(shell_or)s rc=4; [ x\"$rc\" != \"xflag\" ] %(shell_and)s echo \"${rc} \"%(p)s %(shell_and)s exit 0" % dict(p=shell_escaped_path, i=python_interp, shell_and=self._SHELL_AND, shell_or=self._SHELL_OR) # NOQA
csums = [ csums = [
u"({0} -c 'import hashlib; BLOCKSIZE = 65536; hasher = hashlib.sha1();{2}afile = open(\"'{1}'\", \"rb\"){2}buf = afile.read(BLOCKSIZE){2}while len(buf) > 0:{2}\thasher.update(buf){2}\tbuf = afile.read(BLOCKSIZE){2}afile.close(){2}print(hasher.hexdigest())' 2>/dev/null)".format(python_interp, shell_escaped_path, self._SHELL_EMBEDDED_PY_EOL), # NOQA Python > 2.4 (including python3) u"({0} -c 'import hashlib; BLOCKSIZE = 65536; hasher = hashlib.sha1();{2}afile = open(\"'{1}'\", \"rb\"){2}buf = afile.read(BLOCKSIZE){2}while len(buf) > 0:{2}\thasher.update(buf){2}\tbuf = afile.read(BLOCKSIZE){2}afile.close(){2}print(hasher.hexdigest())' 2>/dev/null)".format(python_interp, shell_escaped_path, self._SHELL_EMBEDDED_PY_EOL), # NOQA Python > 2.4 (including python3)

@ -1,4 +1,4 @@
#!/usr/bin/env bash #!/bin/bash
set -eu set -eu

@ -2,10 +2,17 @@
hosts: localhost hosts: localhost
gather_facts: false gather_facts: false
tasks: tasks:
- name: where is bash
shell: command -v bash
register: bash
changed_when: false
- name: call module that spews extra stuff - name: call module that spews extra stuff
bad_json: bad_json:
register: clean_json register: clean_json
ignore_errors: true ignore_errors: true
vars:
ansible_bash_interpreter: '{{ bash.stdout }}'
- name: all expected is there - name: all expected is there
assert: assert:

@ -0,0 +1,3 @@
dependencies:
- prepare_tests
- setup_remote_tmp_dir

@ -0,0 +1,53 @@
- vars:
atd: '{{ remote_tmp_dir }}/shell/t m p'
api: '{{ remote_tmp_dir }}/shell/p y t h o n'
block:
- name: create test dir
file:
path: '{{ atd|dirname }}'
state: directory
- name: create tempdir with spaces
file:
path: '{{ atd }}'
state: directory
- name: create symlink for ansible_python_interpreter to file with spaces
file:
dest: '{{ api }}'
src: '{{ ansible_facts.python.executable }}'
state: link
- name: run simple test playbook
command: >-
ansible-playbook -vvv -i inventory
-e 'ansible_python_interpreter="{{ api }}"'
-e 'ansible_pipelining=0'
"{{ role_path }}/test-command-building-playbook.yml"
environment:
ANSIBLE_REMOTE_TMP: '{{ atd }}'
ANSIBLE_NOCOLOR: "1"
ANSIBLE_FORCE_COLOR: "0"
register: command_building
delegate_to: localhost
- debug:
var: command_building.stdout_lines
- block:
- debug:
var: py_cmd
- debug:
var: tmp_dir
- assert:
that:
- py_cmd in exec_line
- tmp_dir in exec_line
vars:
exec_line: '{{ command_building.stdout_lines | select("search", "EXEC.*p y t h o n") | first }}'
py_cmd: >-
'"'"'{{ api }}'"'"'
tmp_dir: >-
'"'"'{{ atd }}/ansible-tmp-

@ -34,3 +34,5 @@
with_items: with_items:
- powershell - powershell
- sh - sh
- import_tasks: command-building.yml

@ -0,0 +1,4 @@
- hosts: testhost
gather_facts: false
tasks:
- ping:
Loading…
Cancel
Save