Merge branch 'ansible:devel' into feature/ansible-test-proxy-vars

pull/84176/head
Chris Tran 7 months ago committed by GitHub
commit 7fe4933667
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -78,36 +78,44 @@ stages:
test: 2022/psrp/http
- name: 2022 SSH Key
test: 2022/ssh/key
- name: 2025 PSRP HTTP
test: 2025/psrp/http
- name: 2025 SSH Key
test: 2025/ssh/key
- stage: Remote
dependsOn: []
jobs:
- template: templates/matrix.yml # context/target
parameters:
targets:
- name: macOS 14.3
test: macos/14.3
- name: RHEL 9.4 py39
test: rhel/9.4@3.9
- name: RHEL 9.4 py312
test: rhel/9.4@3.12
- name: FreeBSD 13.3
test: freebsd/13.3
- name: FreeBSD 14.1
test: freebsd/14.1
- name: macOS 15.3
test: macos/15.3
- name: RHEL 9.5 py39
test: rhel/9.5@3.9
- name: RHEL 9.5 py312
test: rhel/9.5@3.12
- name: RHEL 10.0
test: rhel/10.0
- name: FreeBSD 13.5
test: freebsd/13.5
- name: FreeBSD 14.2
test: freebsd/14.2
groups:
- 1
- 2
- template: templates/matrix.yml # context/controller
parameters:
targets:
- name: macOS 14.3
test: macos/14.3
- name: RHEL 9.4
test: rhel/9.4
- name: FreeBSD 13.3
test: freebsd/13.3
- name: FreeBSD 14.1
test: freebsd/14.1
- name: macOS 15.3
test: macos/15.3
- name: RHEL 9.5
test: rhel/9.5
- name: RHEL 10.0
test: rhel/10.0
- name: FreeBSD 13.5
test: freebsd/13.5
- name: FreeBSD 14.2
test: freebsd/14.2
groups:
- 3
- 4
@ -115,12 +123,14 @@ stages:
- template: templates/matrix.yml # context/controller (ansible-test container management)
parameters:
targets:
- name: Alpine 3.20
test: alpine/3.20
- name: Fedora 40
test: fedora/40
- name: RHEL 9.4
test: rhel/9.4
- name: Alpine 3.21
test: alpine/3.21
- name: Fedora 41
test: fedora/41
- name: RHEL 9.5
test: rhel/9.5
- name: RHEL 10.0
test: rhel/10.0
- name: Ubuntu 24.04
test: ubuntu/24.04
groups:
@ -132,10 +142,10 @@ stages:
parameters:
testFormat: linux/{0}
targets:
- name: Alpine 3.20
test: alpine320
- name: Fedora 40
test: fedora40
- name: Alpine 3.21
test: alpine321
- name: Fedora 41
test: fedora41
- name: Ubuntu 22.04
test: ubuntu2204
- name: Ubuntu 24.04
@ -147,16 +157,24 @@ stages:
parameters:
testFormat: linux/{0}
targets:
- name: Alpine 3.20
test: alpine320
- name: Fedora 40
test: fedora40
- name: Alpine 3.21
test: alpine321
- name: Fedora 41
test: fedora41
- name: Ubuntu 24.04
test: ubuntu2404
groups:
- 3
- 4
- 5
- template: templates/matrix.yml # context/target (dnf-oldest, dnf-latest)
parameters:
testFormat: linux/{0}
targets:
- name: Fedora 41
test: fedora41
groups:
- 7
- stage: Galaxy
dependsOn: []
jobs:
@ -198,15 +216,10 @@ stages:
test: 2022/psrp/http
- name: 2022 SSH Key
test: 2022/ssh/key
- stage: Incidental
dependsOn: []
jobs:
- template: templates/matrix.yml
parameters:
testFormat: i/{0}/1
targets:
- name: IOS Python
test: ios/csr1000v/
- name: 2025 PSRP HTTP
test: 2025/psrp/http
- name: 2025 SSH Key
test: 2025/ssh/key
- stage: Summary
condition: succeededOrFailed()
dependsOn:
@ -218,6 +231,5 @@ stages:
- Galaxy
- Generic
- Incidental_Windows
- Incidental
jobs:
- template: templates/coverage.yml

@ -1,40 +0,0 @@
#!/usr/bin/env bash
set -o pipefail -eux
declare -a args
IFS='/:' read -ra args <<< "$1"
platform="${args[0]}"
version="${args[1]}"
python_version="${args[2]}"
target="shippable/${platform}/incidental/"
stage="${S:-prod}"
provider="${P:-default}"
# python versions to test in order
# all versions run full tests
IFS=' ' read -r -a python_versions <<< \
"$(PYTHONPATH="${PWD}/test/lib" python -c 'from ansible_test._internal import constants; print(" ".join(constants.CONTROLLER_PYTHON_VERSIONS))')"
if [ "${python_version}" ]; then
# limit tests to a single python version
python_versions=("${python_version}")
fi
for python_version in "${python_versions[@]}"; do
# terminate remote instances on the final python version tested
if [ "${python_version}" = "${python_versions[-1]}" ]; then
terminate="always"
else
terminate="never"
fi
# shellcheck disable=SC2086
ansible-test network-integration --color -v --retry-on-error "${target}" ${COVERAGE:+"$COVERAGE"} ${CHANGED:+"$CHANGED"} ${UNSTABLE:+"$UNSTABLE"} \
--platform "${platform}/${version}" \
--docker default --python "${python_version}" \
--remote-terminate "${terminate}" --remote-stage "${stage}" --remote-provider "${provider}"
done

@ -9,11 +9,11 @@ from __future__ import annotations
import argparse
import dataclasses
import pathlib
import shutil
import shlex
import subprocess
import tempfile
import typing as t
import urllib.request
import venv
@dataclasses.dataclass(frozen=True)
@ -43,6 +43,27 @@ def parse_args() -> Args:
return Args(**kwargs)
def run(*args: str | pathlib.Path) -> None:
cmd = [str(arg) for arg in args]
print(f'==> {shlex.join(cmd)}', flush=True)
subprocess.run(cmd, check=True)
def install_codecov(dest: pathlib.Path) -> pathlib.Path:
package = 'codecov-cli'
version = '11.0.3'
venv_dir = dest / 'venv'
python_bin = venv_dir / 'bin' / 'python'
codecov_bin = venv_dir / 'bin' / 'codecovcli'
venv.create(venv_dir, with_pip=True)
run(python_bin, '-m', 'pip', 'install', f'{package}=={version}', '--disable-pip-version-check')
return codecov_bin
def process_files(directory: pathlib.Path) -> t.Tuple[CoverageFile, ...]:
processed = []
for file in directory.joinpath('reports').glob('coverage*.xml'):
@ -57,45 +78,43 @@ def process_files(directory: pathlib.Path) -> t.Tuple[CoverageFile, ...]:
return tuple(processed)
def upload_files(codecov_bin: pathlib.Path, files: t.Tuple[CoverageFile, ...], dry_run: bool = False) -> None:
def upload_files(codecov_bin: pathlib.Path, config_file: pathlib.Path, files: t.Tuple[CoverageFile, ...], dry_run: bool = False) -> None:
for file in files:
cmd = [
str(codecov_bin),
'--name', file.name,
'--file', str(file.path),
codecov_bin,
'--disable-telem',
'--codecov-yml-path',
config_file,
'upload-process',
'--disable-search',
'--disable-file-fixes',
'--plugin',
'noop',
'--name',
file.name,
'--file',
file.path,
]
for flag in file.flags:
cmd.extend(['--flags', flag])
cmd.extend(['--flag', flag])
if dry_run:
print(f'DRY-RUN: Would run command: {cmd}')
continue
subprocess.run(cmd, check=True)
cmd.append('--dry-run')
def download_file(url: str, dest: pathlib.Path, flags: int, dry_run: bool = False) -> None:
if dry_run:
print(f'DRY-RUN: Would download {url} to {dest} and set mode to {flags:o}')
return
run(*cmd)
with urllib.request.urlopen(url) as resp:
with dest.open('w+b') as f:
# Read data in chunks rather than all at once
shutil.copyfileobj(resp, f, 64 * 1024)
dest.chmod(flags)
def main():
def main() -> None:
args = parse_args()
url = 'https://ci-files.testing.ansible.com/codecov/linux/codecov'
with tempfile.TemporaryDirectory(prefix='codecov-') as tmpdir:
codecov_bin = pathlib.Path(tmpdir) / 'codecov'
download_file(url, codecov_bin, 0o755, args.dry_run)
config_file = pathlib.Path(tmpdir) / 'config.yml'
config_file.write_text('')
codecov_bin = install_codecov(pathlib.Path(tmpdir))
files = process_files(args.path)
upload_files(codecov_bin, files, args.dry_run)
upload_files(codecov_bin, config_file, files, args.dry_run)
if __name__ == '__main__':

@ -12,6 +12,6 @@ if ! ansible-test --help >/dev/null 2>&1; then
pip install https://github.com/ansible/ansible/archive/devel.tar.gz --disable-pip-version-check
fi
# Generate stubs using docker (if supported) otherwise fall back to using a virtual environment instead.
# The use of docker is required when Powershell code is present, but Ansible 2.12 was the first version to support --docker with coverage.
ansible-test coverage xml --group-by command --stub --docker --color -v || ansible-test coverage xml --group-by command --stub --venv --color -v
# Generate stubs using docker.
# The use of docker is mandatory when Powershell code is present.
ansible-test coverage xml --group-by command --stub --docker --color -v

@ -47,23 +47,7 @@ body:
- type: dropdown
attributes:
label: Issue Type
description: >
Please select the single available option in the drop-down.
<details>
<summary>
<em>Why?</em>
</summary>
We would do it by ourselves but unfortunately, the current
edition of GitHub Issue Forms Alpha does not support this yet 🤷
_We will make it easier in the future, once GitHub
supports dropdown defaults. Promise!_
</details>
# FIXME: Once GitHub allows defining the default choice, update this
description: This is a marker for our automatic bot. Do not change it.
options:
- Bug Report
validations:
@ -152,7 +136,7 @@ body:
attributes:
label: Steps to Reproduce
description: |
Describe exactly how to reproduce the problem, using a minimal test-case. It would *really* help us understand your problem if you could also pased any playbooks, configs and commands you used.
Describe exactly how to reproduce the problem, using a minimal test-case. It would *really* help us understand your problem if you could also provide any playbooks, configs and commands you used.
**HINT:** You can paste https://gist.github.com links for larger files.
value: |

@ -16,10 +16,10 @@ contact_links:
url: https://docs.ansible.com/ansible/devel/community/code_of_conduct.html?utm_medium=github&utm_source=issue_template_chooser
about: ❤ Be nice to other members of the community. ☮ Behave.
- name: 💬 Talk to the community
url: https://docs.ansible.com/ansible/devel/community/communication.html?utm_medium=github&utm_source=issue_template_chooser#mailing-list-information
url: https://docs.ansible.com/ansible/devel/community/communication.html?utm_medium=github&utm_source=issue_template_chooser#forum
about: Please ask and answer usage questions here
- name: ⚡ Working groups
url: https://github.com/ansible/community/wiki
url: https://forum.ansible.com/g?utm_medium=github&utm_source=issue_template_chooser
about: Interested in improving a specific area? Become a part of a working group!
- name: 💼 For Enterprise
url: https://www.ansible.com/products/engine?utm_medium=github&utm_source=issue_template_chooser

@ -84,20 +84,7 @@ body:
- type: dropdown
attributes:
label: Issue Type
description: >
Please select the single available option in the drop-down.
<details>
<summary>
<em>Why?</em>
</summary>
_We will make it easier in the future, once GitHub
supports dropdown defaults. Promise!_
</details>
# FIXME: Once GitHub allows defining the default choice, update this
description: This is a marker for our automatic bot. Do not change it.
options:
- Documentation Report
validations:

@ -101,23 +101,7 @@ body:
- type: dropdown
attributes:
label: Issue Type
description: >
Please select the single available option in the drop-down.
<details>
<summary>
<em>Why?</em>
</summary>
We would do it by ourselves but unfortunately, the current
edition of GitHub Issue Forms Alpha does not support this yet 🤷
_We will make it easier in the future, once GitHub
supports dropdown defaults. Promise!_
</details>
# FIXME: Once GitHub allows defining the default choice, update this
description: This is a marker for our automatic bot. Do not change it.
options:
- Feature Idea
validations:

@ -0,0 +1,10 @@
---
name: Internal Issue
about: Free-form issue creation for core maintainer use only.
title: ''
labels: [core-internal]
assignees: ''
---
@ansibot bot_skip

@ -0,0 +1,42 @@
name: Pre-Release Bug Report
description: File a bug report against a pre-release version
labels:
- bug
- pre_release
assignees:
- nitzmahone
- mattclay
body:
- type: markdown
attributes:
value: |
## Bug Report
- type: textarea
attributes:
label: Ansible Version
description: Paste the full output from `ansible --version` below.
render: console
placeholder: $ ansible --version
validations:
required: true
- type: textarea
attributes:
label: Summary
description: Describe the issue with any relevant steps to reproduce.
validations:
required: true
- type: dropdown
attributes:
label: <!-- Bot instructions (ignore this) -->
options:
- |
<!--
### Component Name
bin/ansible
### Issue Type
Bug Report
### Configuration
### OS / Environment
-->
validations:
required: true

@ -2,19 +2,8 @@
<!--- Describe the change below, including rationale and design decisions -->
<!--- HINT: Include "Fixes #nnn" if you are fixing an existing issue -->
<!--- Add "Fixes #1234" or steps to reproduce the problem if there is no corresponding issue -->
##### ISSUE TYPE
- Bugfix Pull Request
##### ADDITIONAL INFORMATION
<!--- Include additional information to help people understand the change here -->
<!--- A step-by-step reproduction of the problem is helpful if there is no related issue -->
<!--- Paste verbatim command output below, e.g. before and after your change -->
```paste below
```

@ -2,18 +2,8 @@
<!--- Describe the change below, including rationale -->
<!--- HINT: Include "Closes #nnn" if you are fixing an existing issue -->
<!--- Add "Fixes #1234" if there is a corresponding issue -->
##### ISSUE TYPE
- Docs Pull Request
##### ADDITIONAL INFORMATION
<!--- Include additional information to help people understand the change here -->
<!--- Paste verbatim command output below, e.g. before and after your change -->
```paste below
```

@ -2,18 +2,8 @@
<!--- Describe the change below, including rationale and design decisions -->
<!--- HINT: Include "Resolves #nnn" if you are fixing an existing issue -->
<!--- Add "Fixes #1234" if there is a corresponding issue -->
##### ISSUE TYPE
- Feature Pull Request
##### ADDITIONAL INFORMATION
<!--- Include additional information to help people understand the change here -->
<!--- Paste verbatim command output below, e.g. before and after your change -->
```paste below
```

@ -2,19 +2,8 @@
<!--- Describe the change below, including rationale and design decisions -->
<!--- HINT: Include "Closes #nnn" if you are fixing an existing issue -->
<!--- Add "Fixes #1234" if there is a corresponding issue -->
##### ISSUE TYPE
- Test Pull Request
##### ADDITIONAL INFORMATION
<!--- Include additional information to help people understand the change here -->
<!--- A step-by-step reproduction of the problem is helpful if there is no related issue -->
<!--- Paste verbatim command output below, e.g. before and after your change -->
```paste below
```

@ -2,7 +2,7 @@
<!--- Describe the change below, including rationale and design decisions -->
<!--- HINT: Include "Fixes #nnn" if you are fixing an existing issue -->
<!--- Add "Fixes #1234" or steps to reproduce the problem if there is no corresponding issue -->
##### ISSUE TYPE
@ -12,14 +12,3 @@
- Docs Pull Request
- Feature Pull Request
- Test Pull Request
##### ADDITIONAL INFORMATION
<!--- Include additional information to help people understand the change here -->
<!--- A step-by-step reproduction of the problem is helpful if there is no related issue -->
<!--- Paste verbatim command output below, e.g. before and after your change -->
```paste below
```

@ -1,3 +1,4 @@
2.20.0 TBD
2.19.0 What Is and What Should Never Be
2.18.0 Fool in the Rain
2.17.0 Gallows Pole

3
.gitignore vendored

@ -97,6 +97,9 @@ Vagrantfile
# vendored lib dir
lib/ansible/_vendor/*
!lib/ansible/_vendor/__init__.py
# PowerShell signed hashlist
lib/ansible/config/powershell_signatures.psd1
*.authenticode
# test stuff
/test/integration/cloud-config-*.*
!/test/integration/cloud-config-*.*.template

@ -1,3 +0,0 @@
minor_changes:
- copy - parameter ``local_follow`` was incorrectly documented as having default value ``True`` (https://github.com/ansible/ansible/pull/83643).
- copy - fix sanity test failures (https://github.com/ansible/ansible/pull/83643).

@ -1,2 +0,0 @@
bugfixes:
- get_url - fix honoring ``filename`` from the ``content-disposition`` header even when the type is ``inline`` (https://github.com/ansible/ansible/issues/83690)

@ -1,2 +0,0 @@
minor_changes:
- "runtime-metadata sanity test - improve validation of ``action_groups`` (https://github.com/ansible/ansible/pull/83965)."

@ -1,3 +0,0 @@
minor_changes:
- Added a -vvvvv log message indicating when a host fails to produce output within the timeout period.
- SSH Escalation-related -vvv log messages now include the associated host information.

@ -0,0 +1,2 @@
minor_changes:
- stat module - add SELinux context as a return value, and add a new option to trigger this return, which is False by default. (https://github.com/ansible/ansible/issues/85217).

@ -0,0 +1,2 @@
minor_changes:
- ansible-doc - Return a more verbose error message when the ``description`` field is missing.

@ -0,0 +1,2 @@
minor_changes:
- ansible-test - Upgrade to ``coverage`` version 7.9.1 for Python 3.9 and later.

@ -0,0 +1,2 @@
minor_changes:
- ansible-test - Removed support for automatic provisioning of obsolete instances for network-integration tests.

@ -1,2 +0,0 @@
minor_changes:
- ansible-test - Update ``nios-test-container`` to version 6.0.0.

@ -1,3 +0,0 @@
minor_changes:
- ansible-test - Improve container runtime probe error handling.
When unexpected probe output is encountered, an error with more useful debugging information is provided.

@ -1,4 +0,0 @@
bugfixes:
- ansible-test - Enable the ``sys.unraisablehook`` work-around for the ``pylint`` sanity test on Python 3.11.
Previously the work-around was only enabled for Python 3.12 and later.
However, the same issue has been discovered on Python 3.11.

@ -1,5 +0,0 @@
minor_changes:
- ansible-test - Update ``pylint`` sanity test to use version 3.3.1.
- ansible-test - Default to Python 3.13 in the ``base`` and ``default`` containers.
- ansible-test - Disable the ``deprecated-`` prefixed ``pylint`` rules as their results vary by Python version.
- ansible-test - Update the ``base`` and ``default`` containers.

@ -0,0 +1,3 @@
---
bugfixes:
- apt - mark dependencies installed as part of deb file installation as auto (https://github.com/ansible/ansible/issues/78123).

@ -1,3 +0,0 @@
---
minor_changes:
- cron - Provide additional error information while writing cron file (https://github.com/ansible/ansible/issues/83223).

@ -1,3 +0,0 @@
---
bugfixes:
- debconf - set empty password values (https://github.com/ansible/ansible/issues/83214).

@ -0,0 +1,2 @@
removed_features:
- dnf/dnf5 - remove deprecated ``install_repoquery`` option.

@ -1,2 +0,0 @@
bugfixes:
- "dnf5 - fix traceback when ``enable_plugins``/``disable_plugins`` is used on ``python3-libdnf5`` versions that do not support this functionality"

@ -1,3 +0,0 @@
---
minor_changes:
- file - make code more readable and simple.

@ -1,2 +0,0 @@
minor_changes:
- find - add a checksum_algorithm parameter to specify which type of checksum the module will return

@ -1,2 +0,0 @@
bugfixes:
- Fix disabling SSL verification when installing collections and roles from git repositories. If ``--ignore-certs`` isn't provided, the value for the ``GALAXY_IGNORE_CERTS`` configuration option will be used (https://github.com/ansible/ansible/issues/83326).

@ -1,2 +0,0 @@
bugfixes:
- Use the requested error message in the ansible.module_utils.facts.timeout timeout function instead of hardcoding one.

@ -1,2 +0,0 @@
bugfixes:
- Errors now preserve stacked error messages even when YAML is involved.

@ -1,2 +0,0 @@
minor_changes:
- module_utils - Add ``NoReturn`` type annotations to functions which never return.

@ -1,3 +0,0 @@
---
bugfixes:
- facts - skip if distribution file path is directory, instead of raising error (https://github.com/ansible/ansible/issues/84006).

@ -1,2 +0,0 @@
bugfixes:
- "``package``/``dnf`` action plugins - provide the reason behind the failure to gather the ``ansible_pkg_mgr`` fact to identify the package backend"

@ -0,0 +1,2 @@
removed_features:
- paramiko - Removed the ``PARAMIKO_HOST_KEY_AUTO_ADD`` and ``PARAMIKO_LOOK_FOR_KEYS`` configuration keys, which were previously deprecated.

@ -1,2 +0,0 @@
minor_changes:
- INVENTORY_IGNORE_EXTS config, removed ``ini`` from the default list, inventory scripts using a corresponding .ini configuration are rare now and inventory.ini files are more common. Those that need to ignore the ini files for inventory scripts can still add it to configuration.

@ -1,3 +0,0 @@
---
bugfixes:
- selector - remove deprecated compat.selector related files (https://github.com/ansible/ansible/pull/84155).

@ -1,2 +0,0 @@
minor_changes:
- service_facts module got freebsd support added.

@ -1,2 +0,0 @@
bugfixes:
- "Do not run implicit ``flush_handlers`` meta tasks when the whole play is excluded from the run due to tags specified."

@ -1,2 +0,0 @@
bugfixes:
- "Improve performance on large inventories by reducing the number of implicit meta tasks."

@ -1,2 +0,0 @@
minor_changes:
- PlayIterator - do not return tasks from already executed roles so specific strategy plugins do not have to do the filtering of such tasks themselves

@ -1,2 +0,0 @@
bugfixes:
- user module now avoids changing ownership of files symlinked in provided home dir skeleton

@ -1,4 +0,0 @@
bugfixes:
- user action will now require O(force) to overwrite the public part of an ssh key when generating ssh keys, as was already the case for the private part.
security_fixes:
- user action won't allow ssh-keygen, chown and chmod to run on existing ssh public key file, avoiding traversal on existing symlinks (CVE-2024-9902).

@ -0,0 +1,2 @@
removed_features:
- yum_repository - remove deprecated ``keepcache`` option.

@ -35,6 +35,7 @@ class Issue:
body: str
project: str
labels: list[str] | None = None
assignee: str | None = None
def create(self) -> str:
cmd = ['gh', 'issue', 'create', '--title', self.title, '--body', self.body, '--project', self.project]
@ -43,8 +44,18 @@ class Issue:
for label in self.labels:
cmd.extend(('--label', label))
process = subprocess.run(cmd, capture_output=True, check=True)
url = process.stdout.decode().strip()
if self.assignee:
cmd.extend(('--assignee', self.assignee))
try:
process = subprocess.run(cmd, capture_output=True, check=True, text=True)
except subprocess.CalledProcessError as ex:
print('>>> Note')
print(f"You may need to run 'gh auth refresh -s project' if 'gh' reports it cannot find the project {self.project!r} when it exists.")
print(f'>>> Standard Output\n{ex.stdout.strip()}\n>>> Standard Error\n{ex.stderr.strip()}\n>>> Exception')
raise
url = process.stdout.strip()
return url
@ -54,6 +65,7 @@ class Feature:
summary: str
component: str
labels: list[str] | None = None
assignee: str | None = None
@staticmethod
def from_dict(data: dict[str, t.Any]) -> Feature:
@ -61,6 +73,7 @@ class Feature:
summary = data.get('summary')
component = data.get('component')
labels = data.get('labels')
assignee = data.get('assignee')
if not isinstance(title, str):
raise RuntimeError(f'`title` is not `str`: {title}')
@ -71,6 +84,9 @@ class Feature:
if not isinstance(component, str):
raise RuntimeError(f'`component` is not `str`: {component}')
if not isinstance(assignee, (str, type(None))):
raise RuntimeError(f'`assignee` is not `str`: {assignee}')
if not isinstance(labels, list) or not all(isinstance(item, str) for item in labels):
raise RuntimeError(f'`labels` is not `list[str]`: {labels}')
@ -79,6 +95,7 @@ class Feature:
summary=summary,
component=component,
labels=labels,
assignee=assignee,
)
def create_issue(self, project: str) -> Issue:
@ -102,6 +119,7 @@ Feature Idea
body=body.strip(),
project=project,
labels=self.labels,
assignee=self.assignee,
)
@ -297,7 +315,21 @@ def create_deprecation_parser(subparser) -> None:
def create_feature_parser(subparser) -> None:
parser: argparse.ArgumentParser = subparser.add_parser('feature')
epilog = """
Example source YAML:
default:
component: ansible-test
labels:
- ansible-test
- feature
assignee: "@me"
features:
- title: Some title goes here
summary: A summary goes here.
"""
parser: argparse.ArgumentParser = subparser.add_parser('feature', epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.set_defaults(type=FeatureArgs)
parser.set_defaults(command=feature_command)

@ -3,9 +3,23 @@
# Description: Modifies the environment for running Ansible from a checkout
# Usage: . ./hacking/env-setup [-q]
# Set PYTHON_BIN
if not set -q PYTHON_BIN
for exe in python3 python
if command -v $exe > /dev/null
set -gx PYTHON_BIN (command -v $exe)
break
end
end
if not set -q PYTHON_BIN
echo "No valid Python found"
exit 1
end
end
# Retrieve the path of the current directory where the script resides
set HACKING_DIR (dirname (status -f))
set FULL_PATH (python -c "import os; print(os.path.realpath('$HACKING_DIR'))")
set FULL_PATH ($PYTHON_BIN -c "import os; print(os.path.realpath('$HACKING_DIR'))")
set ANSIBLE_HOME (dirname $FULL_PATH)
# Set quiet flag
@ -50,20 +64,6 @@ else if not string match -qr $PREFIX_MANPATH'($|:)' $MANPATH
set -gx MANPATH "$PREFIX_MANPATH:$MANPATH"
end
# Set PYTHON_BIN
if not set -q PYTHON_BIN
for exe in python3 python
if command -v $exe > /dev/null
set -gx PYTHON_BIN (command -v $exe)
break
end
end
if not set -q PYTHON_BIN
echo "No valid Python found"
exit 1
end
end
pushd $ANSIBLE_HOME
if test -n "$QUIET"
# Remove any .pyc files found

@ -43,7 +43,6 @@ from pathlib import Path
from ansible.release import __version__
import ansible.utils.vars as utils_vars
from ansible.parsing.dataloader import DataLoader
from ansible.parsing.utils.jsonify import jsonify
from ansible.parsing.splitter import parse_kv
from ansible.plugins.loader import init_plugin_loader
from ansible.executor import module_common
@ -89,6 +88,22 @@ def parse():
return options, args
def jsonify(result, format=False):
""" format JSON output (uncompressed or uncompressed) """
if result is None:
return "{}"
indent = None
if format:
indent = 4
try:
return json.dumps(result, sort_keys=True, indent=indent, ensure_ascii=False)
except UnicodeDecodeError:
return json.dumps(result, sort_keys=True, indent=indent)
def write_argsfile(argstring, json=False):
""" Write args to a file for old-style module's use. """
argspath = Path("~/.ansible_test_module_arguments").expanduser()
@ -152,16 +167,20 @@ def boilerplate_module(modfile, args, interpreters, check, destfile):
if check:
complex_args['_ansible_check_mode'] = True
modfile = os.path.abspath(modfile)
modname = os.path.basename(modfile)
modname = os.path.splitext(modname)[0]
(module_data, module_style, shebang) = module_common.modify_module(
modname,
modfile,
complex_args,
Templar(loader=loader),
built_module = module_common.modify_module(
module_name=modname,
module_path=modfile,
module_args=complex_args,
templar=Templar(loader=loader),
task_vars=task_vars
)
module_data, module_style = built_module.b_module_data, built_module.module_style
if module_style == 'new' and '_ANSIBALLZ_WRAPPER = True' in to_native(module_data):
module_style = 'ansiballz'
@ -198,10 +217,11 @@ def ansiballz_setup(modfile, modname, interpreters):
# All the directories in an AnsiBallZ that modules can live
core_dirs = glob.glob(os.path.join(debug_dir, 'ansible/modules'))
non_core_dirs = glob.glob(os.path.join(debug_dir, 'ansible/legacy'))
collection_dirs = glob.glob(os.path.join(debug_dir, 'ansible_collections/*/*/plugins/modules'))
# There's only one module in an AnsiBallZ payload so look for the first module and then exit
for module_dir in core_dirs + collection_dirs:
for module_dir in core_dirs + collection_dirs + non_core_dirs:
for dirname, directories, filenames in os.walk(module_dir):
for filename in filenames:
if filename == modname + '.py':

@ -9,14 +9,14 @@ as such this is not considered a bug unless it causes an issue with Ansible comm
(`ansible`, `ansible-playbook`, `ansible-doc`, etc).
We do support the provided API for use in developing plugins (modules, dynamic inventories, callbacks, strategies, etc),
but this does not seem to match that case.
but this does not match that case.
If you really need a stable API target to use Ansible, consider using ansible-runner:
If you need a stable API target to use Ansible, consider using ansible-runner:
* <https://github.com/ansible/ansible-runner>
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If you or anyone else has any further questions, please let us know by using any of the communication methods listed in the page below:
If you or anyone else has any further questions, please let us know by using any of the methods listed on the communication page:
* <https://docs.ansible.com/ansible/latest/community/communication.html>

@ -8,9 +8,9 @@ However, we recommend looking into providing this functionality through Ansible
* <https://docs.ansible.com/ansible/devel/dev_guide/developing_collections.html>.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
The mailing list and irc are great ways to ask questions, or post if you don't think this particular issue is resolved.
The Ansible Forum is the best place to start a discussion about this particular issue.
See this page for a complete and up to date list of communication channels and their purposes:
See this page for a complete and up-to-date list of communication channels and their purposes:
* <https://docs.ansible.com/ansible/latest/community/communication.html>

@ -1,11 +1,11 @@
@{{ paste.handle.here }} it seems to me that you are new to GitHub and
@{{ paste.handle.here }} It seems to me that you are new to GitHub and
have created this
[PR](https://help.github.com/articles/about-pull-requests/)
accidentally. That's why I'm closing it.
But no worries! Welcome to the Ansible community :)
Assuming that you wanted to create actual contribution, I think that
Assuming that you wanted to create an actual contribution, I think that
you may want to learn and read through the following articles I've
gathered for you:
@ -14,7 +14,7 @@ gathered for you:
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If you or anyone else has any further questions, please let us know by using any of the communication methods listed in the page below:
If you or anyone else has any further questions, please let us know by using any of the communication methods listed on the page below:
<https://docs.ansible.com/ansible/latest/community/communication.html>

@ -1,19 +1,19 @@
Hi!
Thanks very much for your submission to Ansible. It means a lot to us that you've taken time to contribute.
Thank you very much for your submission to Ansible. It means a lot to us that you've taken the time to contribute.
Unfortunately, we're not sure if we want this feature in the program, and I don't want this to seem confrontational.
Our reasons for this are:
Unfortunately, we're not sure if we want this feature in the program, our reasons are:
* (A) INSERT ITEM HERE
However, we're absolutely always up for discussion.
However, we're always up for discussion.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If you or anyone else has any further questions, please let us know by using any of the communication methods listed in the page below:
If you or anyone else has any further questions, please let us know by using any of the communication methods listed on the page below:
* <https://docs.ansible.com/ansible/latest/community/communication.html>
In the future, sometimes starting a discussion on the development list prior to implementing
In the future, sometimes starting a discussion on the Ansible Forum before implementing
a feature can make getting things included a little easier, but it's not always necessary.
Thank you once again for this and your interest in Ansible!

@ -0,0 +1,21 @@
Hi!
Thank you very much for your submission to Ansible. It means a lot to us that you've taken the time to contribute.
Unfortunately, we're not currently accepting most outside contributions consisting only of minor changes such as:
* Spelling, grammar or typo fixes in comments, code or tests.
* Type annotations for existing code.
* Code style changes.
However, we're always up for discussion.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If you or anyone else has any further questions, please let us know by using any of the communication methods listed on the page below:
* <https://docs.ansible.com/ansible/latest/community/communication.html>
In the future, sometimes starting a discussion on the Ansible Forum before implementing
a feature can make getting things included a little easier, but it's not always necessary.
Thank you once again for this and your interest in Ansible!

@ -1,6 +1,6 @@
Hi!
Thanks very much for your submission to Ansible. It means a lot to us that you've taken time to contribute.
Thanks very much for your submission to Ansible. It means a lot to us that you've taken the time to contribute.
It looks like the work from this pull request is a duplicate of the following PR(s):
@ -8,12 +8,12 @@ It looks like the work from this pull request is a duplicate of the following PR
Based on this, we are going to close this PR in favor of the above as a consolidated location to keep track of the issue.
However, we're absolutely always up for discussion.
In the future, sometimes starting a discussion on the development list prior to implementing a feature
However, we're always up for discussion.
In the future, sometimes starting a discussion on the Ansible Forum before implementing a feature
can make getting things included a little easier, but it's not always necessary.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If you or anyone else has any further questions, please let us know by using any of the communication methods listed in the page below:
If you or anyone else has any further questions, please let us know by using any of the communication methods listed on the page below:
* <https://docs.ansible.com/ansible/latest/community/communication.html>

@ -6,9 +6,9 @@ For more info on our process see <https://docs.ansible.com/ansible/devel/referen
If you or anyone else has any further questions, please let us know by stopping by one of the mailing lists or chat channels, as appropriate.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
The mailing list and irc are great ways to ask questions, or post if you don't think this particular issue is resolved.
The Ansible Forum is the best place to start a discussion about this particular issue.
See this page for a complete and up to date list of communication channels and their purposes:
See this page for a complete and up-to-date list of communication channels and their purposes:
* <https://docs.ansible.com/ansible/latest/community/communication.html>

@ -6,7 +6,7 @@ If you are still interested in seeing this new feature get into Ansible, please
<https://github.com/ansible/proposals/blob/master/proposals_process_proposal.md>
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
The Forum is the best ways to ask questions, or post if you don't think this particular issue is resolved.
The Ansible Forum is the best place to start a discussion about this particular issue.
* <https://forum.ansible.com/tag/ansible-core>

@ -2,16 +2,18 @@ Hi!
Thanks very much for your interest in Ansible. It means a lot to us.
This appears to be a user question, and we'd like to direct these topic to the Ansible Forum.
This appears to be a user question, and we'd like to direct this topic to the Ansible Forum.
* [Ansible Forum](https://forum.ansible.com)
See this page for a complete and up to date list of communication channels and their purposes:
See this page for a complete and up-to-date list of communication channels and their purposes:
* <https://docs.ansible.com/ansible/latest/community/communication.html>
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
If don't you think this particular issue is resolved, you should still stop by there first, we'd appreciate it.
If you don't think this particular issue is resolved, you should still stop by there first, we'd appreciate it.
This allows us to keep the issue tracker for bugs, pull requests, RFEs and the like.
Thank you once again and we look forward to seeing you on the list or IRC. Thanks!
Thank you once again, and we look forward to seeing you on the Ansible Forum!
Thanks!

@ -4,10 +4,10 @@ We have ascertained that the following PR/commits should resolve this question o
<< INSERT SHA/PR LINK HERE >>
This should be included newer releases starting with << RELEASE/the next [major] release(s) >>.
This is included in newer releases starting with << RELEASE/the next [major] release(s) >>.
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
The mailing list and irc are great ways to ask questions, or post if you don't think this particular issue is resolved.
The Ansible Forum is the best place to start a discussion about this particular issue.
See this page for a complete list of communication channels and their purposes:

@ -1,6 +1,6 @@
Hi!
Thanks very much for your submission to Ansible. It means a lot to us.
Thanks very much for your submission to Ansible. It means a lot to us.
We are interested in this idea and would like to see a wider discussion on it on one of our lists.
Reasons for this include:
@ -8,10 +8,9 @@ Reasons for this include:
* INSERT REASONS!
Because this project is very active, we're unlikely to see comments made on closed tickets and we lock them after some time.
Can you please post Ansible Forum so we can talk about this idea with the wider group?
Please post your idea on the Ansible Forum so we can discuss it with the wider group.
* [Ansible Core on the Ansible Forum](https://forum.ansible.com/tag/ansible-core)
* Matrix: [#devel:ansible.im](https://matrix.to/#/#devel:ansible.im)
For other alternatives, check this page for a more complete list of communication channels and their purposes:

@ -0,0 +1,53 @@
from __future__ import annotations
import importlib
import typing as t
from ansible.module_utils import _internal
from ansible.module_utils._internal._json import _profiles
def get_controller_serialize_map() -> dict[type, t.Callable]:
"""
Injected into module_utils code to augment serialization maps with controller-only types.
This implementation replaces the no-op version in module_utils._internal in controller contexts.
"""
from ansible._internal._templating import _lazy_containers
from ansible.parsing.vault import EncryptedString
return {
_lazy_containers._AnsibleLazyTemplateDict: _profiles._JSONSerializationProfile.discard_tags,
_lazy_containers._AnsibleLazyTemplateList: _profiles._JSONSerializationProfile.discard_tags,
EncryptedString: str, # preserves tags since this is an instance of EncryptedString; if tags should be discarded from str, another entry will handle it
}
def import_controller_module(module_name: str, /) -> t.Any:
"""
Injected into module_utils code to import and return the specified module.
This implementation replaces the no-op version in module_utils._internal in controller contexts.
"""
return importlib.import_module(module_name)
_T = t.TypeVar('_T')
def experimental(obj: _T) -> _T:
"""
Decorator for experimental types and methods outside the `_internal` package which accept or expose internal types.
As with internal APIs, these are subject to change at any time without notice.
"""
return obj
def setup() -> None:
"""No-op function to ensure that side-effect only imports of this module are not flagged/removed as 'unused'."""
# DTFIX-FUTURE: this is really fragile- disordered/incorrect imports (among other things) can mess it up. Consider a hosting-env-managed context
# with an enum with at least Controller/Target/Unknown values, and possibly using lazy-init module shims or some other mechanism to allow controller-side
# notification/augmentation of this kind of metadata.
_internal.get_controller_serialize_map = get_controller_serialize_map
_internal.import_controller_module = import_controller_module
_internal.is_controller = True

@ -0,0 +1,101 @@
from __future__ import annotations
import dataclasses
import json
import typing as t
from ansible.module_utils._internal._ansiballz import _extensions
from ansible.module_utils._internal._ansiballz._extensions import _pydevd, _coverage
from ansible.constants import config
_T = t.TypeVar('_T')
class ExtensionManager:
"""AnsiballZ extension manager."""
def __init__(
self,
debugger: _pydevd.Options | None = None,
coverage: _coverage.Options | None = None,
) -> None:
options = dict(
_pydevd=debugger,
_coverage=coverage,
)
self._debugger = debugger
self._coverage = coverage
self._extension_names = tuple(name for name, option in options.items() if option)
self._module_names = tuple(f'{_extensions.__name__}.{name}' for name in self._extension_names)
self.source_mapping: dict[str, str] = {}
@property
def debugger_enabled(self) -> bool:
"""Returns True if the debugger extension is enabled, otherwise False."""
return bool(self._debugger)
@property
def extension_names(self) -> tuple[str, ...]:
"""Names of extensions to include in the AnsiballZ payload."""
return self._extension_names
@property
def module_names(self) -> tuple[str, ...]:
"""Python module names of extensions to include in the AnsiballZ payload."""
return self._module_names
def get_extensions(self) -> dict[str, dict[str, object]]:
"""Return the configured extensions and their options."""
extension_options: dict[str, t.Any] = {}
if self._debugger:
extension_options['_pydevd'] = dataclasses.replace(
self._debugger,
source_mapping=self._get_source_mapping(),
)
if self._coverage:
extension_options['_coverage'] = self._coverage
extensions = {extension: dataclasses.asdict(options) for extension, options in extension_options.items()}
return extensions
def _get_source_mapping(self) -> dict[str, str]:
"""Get the source mapping, adjusting the source root as needed."""
if self._debugger.source_mapping:
source_mapping = {self._translate_path(key): value for key, value in self.source_mapping.items()}
else:
source_mapping = self.source_mapping
return source_mapping
def _translate_path(self, path: str) -> str:
"""Translate a local path to a foreign path."""
for replace, match in self._debugger.source_mapping.items():
if path.startswith(match):
return replace + path[len(match) :]
return path
@classmethod
def create(cls, task_vars: dict[str, object]) -> t.Self:
"""Create an instance using the provided task vars."""
return cls(
debugger=cls._get_options('_ANSIBALLZ_DEBUGGER_CONFIG', _pydevd.Options, task_vars),
coverage=cls._get_options('_ANSIBALLZ_COVERAGE_CONFIG', _coverage.Options, task_vars),
)
@classmethod
def _get_options(cls, name: str, config_type: type[_T], task_vars: dict[str, object]) -> _T | None:
"""Parse configuration from the named environment variable as the specified type, or None if not configured."""
if (value := config.get_config_value(name, variables=task_vars)) is None:
return None
data = json.loads(value) if isinstance(value, str) else value
options = config_type(**data)
return options

@ -0,0 +1,262 @@
# shebang placeholder
from __future__ import annotations
import datetime
# For test-module.py script to tell this is a ANSIBALLZ_WRAPPER
_ANSIBALLZ_WRAPPER = True
# This code is part of Ansible, but is an independent component.
# The code in this particular templatable string, and this templatable string
# only, is BSD licensed. Modules which end up using this snippet, which is
# dynamically combined together by Ansible still belong to the author of the
# module, and they may assign their own license to the complete work.
#
# Copyright (c), James Cammarata, 2016
# Copyright (c), Toshio Kuratomi, 2016
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
def _ansiballz_main(
zip_data: str,
ansible_module: str,
module_fqn: str,
params: str,
profile: str,
date_time: datetime.datetime,
extensions: dict[str, dict[str, object]],
rlimit_nofile: int,
) -> None:
import os
import os.path
# Access to the working directory is required by Python when using pipelining, as well as for the coverage module.
# Some platforms, such as macOS, may not allow querying the working directory when using become to drop privileges.
try:
os.getcwd()
except OSError:
try:
os.chdir(os.path.expanduser('~'))
except OSError:
os.chdir('/')
if rlimit_nofile:
import resource
existing_soft, existing_hard = resource.getrlimit(resource.RLIMIT_NOFILE)
# adjust soft limit subject to existing hard limit
requested_soft = min(existing_hard, rlimit_nofile)
if requested_soft != existing_soft:
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (requested_soft, existing_hard))
except ValueError:
# some platforms (eg macOS) lie about their hard limit
pass
import sys
import __main__
# For some distros and python versions we pick up this script in the temporary
# directory. This leads to problems when the ansible module masks a python
# library that another import needs. We have not figured out what about the
# specific distros and python versions causes this to behave differently.
#
# Tested distros:
# Fedora23 with python3.4 Works
# Ubuntu15.10 with python2.7 Works
# Ubuntu15.10 with python3.4 Fails without this
# Ubuntu16.04.1 with python3.5 Fails without this
# To test on another platform:
# * use the copy module (since this shadows the stdlib copy module)
# * Turn off pipelining
# * Make sure that the destination file does not exist
# * ansible ubuntu16-test -m copy -a 'src=/etc/motd dest=/var/tmp/m'
# This will traceback in shutil. Looking at the complete traceback will show
# that shutil is importing copy which finds the ansible module instead of the
# stdlib module
scriptdir = None
try:
scriptdir = os.path.dirname(os.path.realpath(__main__.__file__))
except (AttributeError, OSError):
# Some platforms don't set __file__ when reading from stdin
# OSX raises OSError if using abspath() in a directory we don't have
# permission to read (realpath calls abspath)
pass
# Strip cwd from sys.path to avoid potential permissions issues
excludes = {'', '.', scriptdir}
sys.path = [p for p in sys.path if p not in excludes]
import base64
import shutil
import tempfile
import zipfile
def invoke_module(modlib_path: str, json_params: bytes) -> None:
# When installed via setuptools (including python setup.py install),
# ansible may be installed with an easy-install.pth file. That file
# may load the system-wide install of ansible rather than the one in
# the module. sitecustomize is the only way to override that setting.
z = zipfile.ZipFile(modlib_path, mode='a')
# py3: modlib_path will be text, py2: it's bytes. Need bytes at the end
sitecustomize = u'import sys\\nsys.path.insert(0,"%s")\\n' % modlib_path
sitecustomize = sitecustomize.encode('utf-8')
# Use a ZipInfo to work around zipfile limitation on hosts with
# clocks set to a pre-1980 year (for instance, Raspberry Pi)
zinfo = zipfile.ZipInfo()
zinfo.filename = 'sitecustomize.py'
zinfo.date_time = date_time.utctimetuple()[:6]
z.writestr(zinfo, sitecustomize)
z.close()
# Put the zipped up module_utils we got from the controller first in the python path so that we
# can monkeypatch the right basic
sys.path.insert(0, modlib_path)
from ansible.module_utils._internal._ansiballz import _loader
_loader.run_module(
json_params=json_params,
profile=profile,
module_fqn=module_fqn,
modlib_path=modlib_path,
extensions=extensions,
)
def debug(command: str, modlib_path: str, json_params: bytes) -> None:
# The code here normally doesn't run. It's only used for debugging on the
# remote machine.
#
# The subcommands in this function make it easier to debug ansiballz
# modules. Here's the basic steps:
#
# Run ansible with the environment variable: ANSIBLE_KEEP_REMOTE_FILES=1 and -vvv
# to save the module file remotely::
# $ ANSIBLE_KEEP_REMOTE_FILES=1 ansible host1 -m ping -a 'data=october' -vvv
#
# Part of the verbose output will tell you where on the remote machine the
# module was written to::
# [...]
# <host1> SSH: EXEC ssh -C -q -o ControlMaster=auto -o ControlPersist=60s -o KbdInteractiveAuthentication=no -o
# PreferredAuthentications=gssapi-with-mic,gssapi-keyex,hostbased,publickey -o PasswordAuthentication=no -o ConnectTimeout=10 -o
# ControlPath=/home/badger/.ansible/cp/ansible-ssh-%h-%p-%r -tt rhel7 '/bin/sh -c '"'"'LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8
# LC_MESSAGES=en_US.UTF-8 /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping'"'"''
# [...]
#
# Login to the remote machine and run the module file via from the previous
# step with the explode subcommand to extract the module payload into
# source files::
# $ ssh host1
# $ /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping explode
# Module expanded into:
# /home/badger/.ansible/tmp/ansible-tmp-1461173408.08-279692652635227/ansible
#
# You can now edit the source files to instrument the code or experiment with
# different parameter values. When you're ready to run the code you've modified
# (instead of the code from the actual zipped module), use the execute subcommand like this::
# $ /usr/bin/python /home/badger/.ansible/tmp/ansible-tmp-1461173013.93-9076457629738/ping execute
# Okay to use __file__ here because we're running from a kept file
basedir = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'debug_dir')
args_path = os.path.join(basedir, 'args')
if command == 'explode':
# transform the ZIPDATA into an exploded directory of code and then
# print the path to the code. This is an easy way for people to look
# at the code on the remote machine for debugging it in that
# environment
z = zipfile.ZipFile(modlib_path)
for filename in z.namelist():
if filename.startswith('/'):
raise Exception('Something wrong with this module zip file: should not contain absolute paths')
dest_filename = os.path.join(basedir, filename)
if dest_filename.endswith(os.path.sep) and not os.path.exists(dest_filename):
os.makedirs(dest_filename)
else:
directory = os.path.dirname(dest_filename)
if not os.path.exists(directory):
os.makedirs(directory)
with open(dest_filename, 'wb') as writer:
writer.write(z.read(filename))
# write the args file
with open(args_path, 'wb') as writer:
writer.write(json_params)
print('Module expanded into:')
print(basedir)
elif command == 'execute':
# Execute the exploded code instead of executing the module from the
# embedded ZIPDATA. This allows people to easily run their modified
# code on the remote machine to see how changes will affect it.
# Set pythonpath to the debug dir
sys.path.insert(0, basedir)
# read in the args file which the user may have modified
with open(args_path, 'rb') as reader:
json_params = reader.read()
from ansible.module_utils._internal._ansiballz import _loader
_loader.run_module(
json_params=json_params,
profile=profile,
module_fqn=module_fqn,
modlib_path=modlib_path,
extensions=extensions,
)
else:
print(f'FATAL: Unknown debug command {command!r}. Doing nothing.')
#
# See comments in the debug() method for information on debugging
#
encoded_params = params.encode()
# There's a race condition with the controller removing the
# remote_tmpdir and this module executing under async. So we cannot
# store this in remote_tmpdir (use system tempdir instead)
# Only need to use [ansible_module]_payload_ in the temp_path until we move to zipimport
# (this helps ansible-test produce coverage stats)
# IMPORTANT: The real path must be used here to ensure a remote debugger such as PyCharm (using pydevd) can resolve paths correctly.
temp_path = os.path.realpath(tempfile.mkdtemp(prefix='ansible_' + ansible_module + '_payload_'))
try:
zipped_mod = os.path.join(temp_path, 'ansible_' + ansible_module + '_payload.zip')
with open(zipped_mod, 'wb') as modlib:
modlib.write(base64.b64decode(zip_data))
if len(sys.argv) == 2:
debug(sys.argv[1], zipped_mod, encoded_params)
else:
invoke_module(zipped_mod, encoded_params)
finally:
shutil.rmtree(temp_path, ignore_errors=True)

@ -0,0 +1,47 @@
from __future__ import annotations as _annotations
import collections.abc as _c
import typing as _t
_T_co = _t.TypeVar('_T_co', covariant=True)
class SequenceProxy(_c.Sequence[_T_co]):
"""A read-only sequence proxy."""
# DTFIX5: needs unit test coverage
__slots__ = ('__value',)
def __init__(self, value: _c.Sequence[_T_co]) -> None:
self.__value = value
@_t.overload
def __getitem__(self, index: int) -> _T_co: ...
@_t.overload
def __getitem__(self, index: slice) -> _c.Sequence[_T_co]: ...
def __getitem__(self, index: int | slice) -> _T_co | _c.Sequence[_T_co]:
if isinstance(index, slice):
return self.__class__(self.__value[index])
return self.__value[index]
def __len__(self) -> int:
return len(self.__value)
def __contains__(self, item: object) -> bool:
return item in self.__value
def __iter__(self) -> _t.Iterator[_T_co]:
yield from self.__value
def __reversed__(self) -> _c.Iterator[_T_co]:
return reversed(self.__value)
def index(self, *args) -> int:
return self.__value.index(*args)
def count(self, value: object) -> int:
return self.__value.count(value)

@ -0,0 +1,130 @@
from __future__ import annotations
import dataclasses
import os
import types
import typing as t
from ansible.module_utils._internal._datatag import _tag_dataclass_kwargs, AnsibleDatatagBase, AnsibleSingletonTagBase
@dataclasses.dataclass(**_tag_dataclass_kwargs)
class Origin(AnsibleDatatagBase):
"""
A tag that stores origin metadata for a tagged value, intended for forensic/diagnostic use.
Origin metadata should not be used to make runtime decisions, as it is not guaranteed to be present or accurate.
Setting both `path` and `line_num` can result in diagnostic display of referenced file contents.
Either `path` or `description` must be present.
"""
path: str | None = None
"""The path from which the tagged content originated."""
description: str | None = None
"""A description of the origin, for display to users."""
line_num: int | None = None
"""An optional line number, starting at 1."""
col_num: int | None = None
"""An optional column number, starting at 1."""
UNKNOWN: t.ClassVar[t.Self]
@classmethod
def get_or_create_tag(cls, value: t.Any, path: str | os.PathLike | None) -> Origin:
"""Return the tag from the given value, creating a tag from the provided path if no tag was found."""
if not (origin := cls.get_tag(value)):
if path:
origin = Origin(path=str(path)) # convert tagged strings and path-like values to a native str
else:
origin = Origin.UNKNOWN
return origin
def replace(
self,
path: str | types.EllipsisType = ...,
description: str | types.EllipsisType = ...,
line_num: int | None | types.EllipsisType = ...,
col_num: int | None | types.EllipsisType = ...,
) -> t.Self:
"""Return a new origin based on an existing one, with the given fields replaced."""
return dataclasses.replace(
self,
**{
key: value
for key, value in dict(
path=path,
description=description,
line_num=line_num,
col_num=col_num,
).items()
if value is not ...
}, # type: ignore[arg-type]
)
def _post_validate(self) -> None:
if self.path:
if not self.path.startswith('/'):
raise RuntimeError('The `src` field must be an absolute path.')
elif not self.description:
raise RuntimeError('The `src` or `description` field must be specified.')
def __str__(self) -> str:
"""Renders the origin in the form of path:line_num:col_num, omitting missing/invalid elements from the right."""
if self.path:
value = self.path
else:
value = self.description
if self.line_num and self.line_num > 0:
value += f':{self.line_num}'
if self.col_num and self.col_num > 0:
value += f':{self.col_num}'
if self.path and self.description:
value += f' ({self.description})'
return value
Origin.UNKNOWN = Origin(description='<unknown>')
@dataclasses.dataclass(**_tag_dataclass_kwargs)
class VaultedValue(AnsibleDatatagBase):
"""Tag for vault-encrypted strings that carries the original ciphertext for round-tripping."""
ciphertext: str
def _get_tag_to_propagate(self, src: t.Any, value: object, *, value_type: t.Optional[type] = None) -> t.Self | None:
# Since VaultedValue stores the encrypted representation of the value on which it is tagged,
# it is incorrect to propagate the tag to a value which is not equal to the original.
# If the tag were copied to another value and subsequently serialized as the original encrypted value,
# the result would then differ from the value on which the tag was applied.
# Comparisons which can trigger an exception are indicative of a bug and should not be handled here.
# For example:
# * When `src` is an undecryptable `EncryptedString` -- it is not valid to apply this tag to that type.
# * When `value` is a `Marker` -- this requires a templating, but vaulted values do not support templating.
if src == value: # assume the tag was correctly applied to src
return self # same plaintext value, tag propagation with same ciphertext is safe
return self.get_tag(value) # different value, preserve the existing tag, if any
@dataclasses.dataclass(**_tag_dataclass_kwargs)
class TrustedAsTemplate(AnsibleSingletonTagBase):
"""
Indicates the tagged string is trusted to parse and render as a template.
Do *NOT* apply this tag to data from untrusted sources, as this would allow code injection during templating.
"""
@dataclasses.dataclass(**_tag_dataclass_kwargs)
class SourceWasEncrypted(AnsibleSingletonTagBase):
"""
For internal use only.
Indicates the tagged value was sourced from an encrypted file.
Currently applied only by DataLoader.get_text_file_contents() and by extension DataLoader.load_from_file().
"""

@ -0,0 +1,19 @@
from __future__ import annotations
from ansible.module_utils._internal._datatag import AnsibleTagHelper
def str_problematic_strip(value: str) -> str:
"""
Return a copy of `value` with leading and trailing whitespace removed.
Used where `str.strip` is needed, but tags must be preserved *AND* the stripping behavior likely shouldn't exist.
If the stripping behavior is non-problematic, use `AnsibleTagHelper.tag_copy` around `str.strip` instead.
"""
if (stripped_value := value.strip()) == value:
return value
# FUTURE: consider deprecating some/all usages of this method; they generally imply a code smell or pattern we shouldn't be supporting
stripped_value = AnsibleTagHelper.tag_copy(value, stripped_value)
return stripped_value

@ -0,0 +1,33 @@
from __future__ import annotations
import io
import typing as _t
from .._wrapt import ObjectProxy
from ...module_utils._internal import _datatag
class TaggedStreamWrapper(ObjectProxy):
"""
Janky proxy around IOBase to allow streams to carry tags and support basic interrogation by the tagging API.
Most tagging operations will have undefined behavior for this type.
"""
_self__ansible_tags_mapping: _datatag._AnsibleTagsMapping
def __init__(self, stream: io.IOBase, tags: _datatag.AnsibleDatatagBase | _t.Iterable[_datatag.AnsibleDatatagBase]) -> None:
super().__init__(stream)
tag_list: list[_datatag.AnsibleDatatagBase]
# noinspection PyProtectedMember
if type(tags) in _datatag._known_tag_types:
tag_list = [tags] # type: ignore[list-item]
else:
tag_list = list(tags) # type: ignore[arg-type]
self._self__ansible_tags_mapping = _datatag._AnsibleTagsMapping((type(tag), tag) for tag in tag_list)
@property
def _ansible_tags_mapping(self) -> _datatag._AnsibleTagsMapping:
return self._self__ansible_tags_mapping

@ -0,0 +1,66 @@
from __future__ import annotations
import contextlib
import signal
import types
import typing as _t
from ansible.module_utils import datatag
class AnsibleTimeoutError(BaseException):
"""A general purpose timeout."""
_MAX_TIMEOUT = 100_000_000
"""
The maximum supported timeout value.
This value comes from BSD's alarm limit, which is due to that function using setitimer.
"""
def __init__(self, timeout: int) -> None:
self.timeout = timeout
super().__init__(f"Timed out after {timeout} second(s).")
@classmethod
@contextlib.contextmanager
def alarm_timeout(cls, timeout: int | None) -> _t.Iterator[None]:
"""
Context for running code under an optional timeout.
Raises an instance of this class if the timeout occurs.
New usages of this timeout mechanism are discouraged.
"""
if timeout is not None:
if not isinstance(timeout, int):
raise TypeError(f"Timeout requires 'int' argument, not {datatag.native_type_name(timeout)!r}.")
if timeout < 0 or timeout > cls._MAX_TIMEOUT:
# On BSD based systems, alarm is implemented using setitimer.
# If out-of-bounds values are passed to alarm, they will return -1, which would be interpreted as an existing timer being set.
# To avoid that, bounds checking is performed in advance.
raise ValueError(f'Timeout {timeout} is invalid, it must be between 0 and {cls._MAX_TIMEOUT}.')
if not timeout:
yield # execute the context manager's body
return # no timeout to deal with, exit immediately
def on_alarm(_signal: int, _frame: types.FrameType) -> None:
raise cls(timeout)
if signal.signal(signal.SIGALRM, on_alarm):
raise RuntimeError("An existing alarm handler was present.")
try:
try:
if signal.alarm(timeout):
raise RuntimeError("An existing alarm was set.")
yield # execute the context manager's body
finally:
# Disable the alarm.
# If the alarm fires inside this finally block, the alarm is still disabled.
# This guarantees the cleanup code in the outer finally block runs without risk of encountering the `TaskTimeoutError` from the alarm.
signal.alarm(0)
finally:
signal.signal(signal.SIGALRM, signal.SIG_DFL)

@ -0,0 +1,123 @@
from __future__ import annotations
import collections.abc as _c
import dataclasses
import typing as t
from ansible._internal._errors import _error_utils
from ansible.errors import AnsibleRuntimeError
from ansible.module_utils._internal import _messages
class AnsibleCapturedError(AnsibleRuntimeError):
"""An exception representing error detail captured in another context where the error detail must be serialized to be preserved."""
context: t.ClassVar[str]
def __init__(
self,
*,
obj: t.Any = None,
event: _messages.Event,
) -> None:
super().__init__(
obj=obj,
)
self._event = event
class AnsibleResultCapturedError(AnsibleCapturedError, _error_utils.ContributesToTaskResult):
"""
An exception representing error detail captured in a foreign context where an action/module result dictionary is involved.
This exception provides a result dictionary via the ContributesToTaskResult mixin.
"""
def __init__(self, event: _messages.Event, result: dict[str, t.Any]) -> None:
super().__init__(event=event)
self._result = result
@property
def result_contribution(self) -> _c.Mapping[str, object]:
return self._result
@classmethod
def maybe_raise_on_result(cls, result: dict[str, t.Any]) -> None:
"""Normalize the result and raise an exception if the result indicated failure."""
if error_summary := cls.normalize_result_exception(result):
raise error_summary.error_type(error_summary.event, result)
@classmethod
def normalize_result_exception(cls, result: dict[str, t.Any]) -> CapturedErrorSummary | None:
"""
Normalize the result `exception`, if any, to be a `CapturedErrorSummary` instance.
If a new `CapturedErrorSummary` was created, the `error_type` will be `cls`.
The `exception` key will be removed if falsey.
A `CapturedErrorSummary` instance will be returned if `failed` is truthy.
"""
if type(cls) is AnsibleResultCapturedError: # pylint: disable=unidiomatic-typecheck
raise TypeError('The normalize_result_exception method cannot be called on the AnsibleCapturedError base type, use a derived type.')
if not isinstance(result, dict):
raise TypeError(f'Malformed result. Received {type(result)} instead of {dict}.')
failed = result.get('failed') # DTFIX-FUTURE: warn if failed is present and not a bool, or exception is present without failed being True
exception = result.pop('exception', None)
if not failed and not exception:
return None
if isinstance(exception, CapturedErrorSummary):
error_summary = exception
elif isinstance(exception, _messages.ErrorSummary):
error_summary = CapturedErrorSummary(
event=exception.event,
error_type=cls,
)
else:
# translate non-ErrorDetail errors
error_summary = CapturedErrorSummary(
event=_messages.Event(
msg=str(result.get('msg', 'Unknown error.')),
formatted_traceback=cls._normalize_traceback(exception),
),
error_type=cls,
)
result.update(exception=error_summary)
return error_summary if failed else None # even though error detail was normalized, only return it if the result indicated failure
@classmethod
def _normalize_traceback(cls, value: object | None) -> str | None:
"""Normalize the provided traceback value, returning None if it is falsey."""
if not value:
return None
value = str(value).rstrip()
if not value:
return None
return value + '\n'
class AnsibleActionCapturedError(AnsibleResultCapturedError):
"""An exception representing error detail sourced directly by an action in its result dictionary."""
_default_message = 'Action failed.'
context = 'action'
class AnsibleModuleCapturedError(AnsibleResultCapturedError):
"""An exception representing error detail captured in a module context and returned from an action's result dictionary."""
_default_message = 'Module failed.'
context = 'target'
@dataclasses.dataclass(**_messages._dataclass_kwargs)
class CapturedErrorSummary(_messages.ErrorSummary):
error_type: type[AnsibleResultCapturedError] | None = None

@ -0,0 +1,89 @@
from __future__ import annotations as _annotations
from ansible.module_utils._internal import _errors, _messages
class ControllerEventFactory(_errors.EventFactory):
"""Factory for creating `Event` instances from `BaseException` instances on the controller."""
def _get_msg(self, exception: BaseException) -> str | None:
from ansible.errors import AnsibleError
if not isinstance(exception, AnsibleError):
return super()._get_msg(exception)
return exception._original_message.strip()
def _get_formatted_source_context(self, exception: BaseException) -> str | None:
from ansible.errors import AnsibleError
if not isinstance(exception, AnsibleError):
return super()._get_formatted_source_context(exception)
return exception._formatted_source_context
def _get_help_text(self, exception: BaseException) -> str | None:
from ansible.errors import AnsibleError
if not isinstance(exception, AnsibleError):
return super()._get_help_text(exception)
return exception._help_text
def _get_chain(self, exception: BaseException) -> _messages.EventChain | None:
from ansible._internal._errors import _captured # avoid circular import due to AnsibleError import
if isinstance(exception, _captured.AnsibleCapturedError):
# a captured error provides its own cause event, it never has a normal __cause__
return _messages.EventChain(
msg_reason=_errors.MSG_REASON_DIRECT_CAUSE,
traceback_reason=f'The above {exception.context} exception was the direct cause of the following controller exception:',
event=exception._event,
)
return super()._get_chain(exception)
def _follow_cause(self, exception: BaseException) -> bool:
from ansible.errors import AnsibleError
return not isinstance(exception, AnsibleError) or exception._include_cause_message
def _get_cause(self, exception: BaseException) -> BaseException | None:
# deprecated: description='remove support for orig_exc (deprecated in 2.23)' core_version='2.27'
cause = super()._get_cause(exception)
from ansible.errors import AnsibleError
if not isinstance(exception, AnsibleError):
return cause
try:
from ansible.utils.display import _display
except Exception: # pylint: disable=broad-except # if config is broken, this can raise things other than ImportError
_display = None
if cause:
if exception.orig_exc and exception.orig_exc is not cause and _display:
_display.warning(
msg=f"The `orig_exc` argument to `{type(exception).__name__}` was given, but differed from the cause given by `raise ... from`.",
)
return cause
if exception.orig_exc:
if _display:
# encourage the use of `raise ... from` before deprecating `orig_exc`
_display.warning(
msg=f"The `orig_exc` argument to `{type(exception).__name__}` was given without using `raise ... from orig_exc`.",
)
return exception.orig_exc
return None
def _get_events(self, exception: BaseException) -> tuple[_messages.Event, ...] | None:
if isinstance(exception, BaseExceptionGroup):
return tuple(self._convert_exception(ex) for ex in exception.exceptions)
return None

@ -0,0 +1,240 @@
from __future__ import annotations
import abc
import collections.abc as _c
import dataclasses
import itertools
import pathlib
import textwrap
import typing as t
from ansible._internal._datatag._tags import Origin
from ansible._internal._errors import _error_factory
from ansible.module_utils._internal import _ambient_context, _event_utils, _messages, _traceback
class ContributesToTaskResult(metaclass=abc.ABCMeta):
"""Exceptions may include this mixin to contribute task result dictionary data directly to the final result."""
@property
@abc.abstractmethod
def result_contribution(self) -> _c.Mapping[str, object]:
"""Mapping of results to apply to the task result."""
@property
def omit_exception_key(self) -> bool:
"""Non-error exceptions (e.g., `AnsibleActionSkip`) must return `True` to ensure omission of the `exception` key."""
return False
@property
def omit_failed_key(self) -> bool:
"""Exceptions representing non-failure scenarios (e.g., `skipped`, `unreachable`) must return `True` to ensure omisson of the `failed` key."""
return False
class RedactAnnotatedSourceContext(_ambient_context.AmbientContextBase):
"""When active, this context will redact annotated source lines, showing only the origin."""
@dataclasses.dataclass(kw_only=True, frozen=True)
class SourceContext:
origin: Origin
annotated_source_lines: list[str]
target_line: str | None
def __str__(self) -> str:
msg_lines = [f'Origin: {self.origin}']
if self.annotated_source_lines:
msg_lines.append('')
msg_lines.extend(self.annotated_source_lines)
return '\n'.join(msg_lines)
@classmethod
def from_value(cls, value: t.Any) -> SourceContext | None:
"""Attempt to retrieve source and render a contextual indicator from the value's origin (if any)."""
if value is None:
return None
if isinstance(value, Origin):
origin = value
value = None
else:
origin = Origin.get_tag(value)
if RedactAnnotatedSourceContext.current(optional=True):
return cls.error('content redacted')
if origin and origin.path:
return cls.from_origin(origin)
if value is None:
truncated_value = None
annotated_source_lines = []
else:
# DTFIX-FUTURE: cleanup/share width
try:
value = str(value)
except Exception as ex:
value = f'<< context unavailable: {ex} >>'
truncated_value = textwrap.shorten(value, width=120)
annotated_source_lines = [truncated_value]
return SourceContext(
origin=origin or Origin.UNKNOWN,
annotated_source_lines=annotated_source_lines,
target_line=truncated_value,
)
@staticmethod
def error(message: str | None, origin: Origin | None = None) -> SourceContext:
return SourceContext(
origin=origin,
annotated_source_lines=[f'(source not shown: {message})'] if message else [],
target_line=None,
)
@classmethod
def from_origin(cls, origin: Origin) -> SourceContext:
"""Attempt to retrieve source and render a contextual indicator of an error location."""
from ansible.parsing.vault import is_encrypted # avoid circular import
# DTFIX-FUTURE: support referencing the column after the end of the target line, so we can indicate where a missing character (quote) needs to be added
# this is also useful for cases like end-of-stream reported by the YAML parser
# DTFIX-FUTURE: Implement line wrapping and match annotated line width to the terminal display width.
context_line_count: t.Final = 2
max_annotated_line_width: t.Final = 120
truncation_marker: t.Final = '...'
target_line_num = origin.line_num
if RedactAnnotatedSourceContext.current(optional=True):
return cls.error('content redacted', origin)
if not target_line_num or target_line_num < 1:
return cls.error(None, origin) # message omitted since lack of line number is obvious from pos
start_line_idx = max(0, (target_line_num - 1) - context_line_count) # if near start of file
target_col_num = origin.col_num
try:
with pathlib.Path(origin.path).open() as src:
first_line = src.readline()
lines = list(itertools.islice(itertools.chain((first_line,), src), start_line_idx, target_line_num))
except Exception as ex:
return cls.error(type(ex).__name__, origin)
if is_encrypted(first_line):
return cls.error('content encrypted', origin)
if len(lines) != target_line_num - start_line_idx:
return cls.error('file truncated', origin)
annotated_source_lines = []
line_label_width = len(str(target_line_num))
max_src_line_len = max_annotated_line_width - line_label_width - 1
usable_line_len = max_src_line_len
for line_num, line in enumerate(lines, start_line_idx + 1):
line = line.rstrip('\n') # universal newline default mode on `open` ensures we'll never see anything but \n
line = line.replace('\t', ' ') # mixed tab/space handling is intentionally disabled since we're both format and display config agnostic
if len(line) > max_src_line_len:
line = line[: max_src_line_len - len(truncation_marker)] + truncation_marker
usable_line_len = max_src_line_len - len(truncation_marker)
annotated_source_lines.append(f'{str(line_num).rjust(line_label_width)}{" " if line else ""}{line}')
if target_col_num and usable_line_len >= target_col_num >= 1:
column_marker = f'column {target_col_num}'
target_col_idx = target_col_num - 1
if target_col_idx + 2 + len(column_marker) > max_src_line_len:
column_marker = f'{" " * (target_col_idx - len(column_marker) - 1)}{column_marker} ^'
else:
column_marker = f'{" " * target_col_idx}^ {column_marker}'
column_marker = f'{" " * line_label_width} {column_marker}'
annotated_source_lines.append(column_marker)
elif target_col_num is None:
underline_length = len(annotated_source_lines[-1]) - line_label_width - 1
annotated_source_lines.append(f'{" " * line_label_width} {"^" * underline_length}')
return SourceContext(
origin=origin,
annotated_source_lines=annotated_source_lines,
target_line=lines[-1].rstrip('\n'), # universal newline default mode on `open` ensures we'll never see anything but \n
)
def format_exception_message(exception: BaseException) -> str:
"""Return the full chain of exception messages by concatenating the cause(s) until all are exhausted."""
return _event_utils.format_event_brief_message(_error_factory.ControllerEventFactory.from_exception(exception, False))
def result_dict_from_exception(exception: BaseException, accept_result_contribution: bool = False) -> dict[str, object]:
"""Return a failed task result dict from the given exception."""
event = _error_factory.ControllerEventFactory.from_exception(exception, _traceback.is_traceback_enabled(_traceback.TracebackEvent.ERROR))
result: dict[str, object] = {}
omit_failed_key = False
omit_exception_key = False
if accept_result_contribution:
while exception:
if isinstance(exception, ContributesToTaskResult):
result = dict(exception.result_contribution)
omit_failed_key = exception.omit_failed_key
omit_exception_key = exception.omit_exception_key
break
exception = exception.__cause__
if omit_failed_key:
result.pop('failed', None)
else:
result.update(failed=True)
if omit_exception_key:
result.pop('exception', None)
else:
result.update(exception=_messages.ErrorSummary(event=event))
if 'msg' not in result:
# if nothing contributed `msg`, generate one from the exception messages
result.update(msg=_event_utils.format_event_brief_message(event))
return result
def result_dict_from_captured_errors(
msg: str,
*,
errors: list[_messages.ErrorSummary] | None = None,
) -> dict[str, object]:
"""Return a failed task result dict from the given error message and captured errors."""
_skip_stackwalk = True
event = _messages.Event(
msg=msg,
formatted_traceback=_traceback.maybe_capture_traceback(msg, _traceback.TracebackEvent.ERROR),
events=tuple(error.event for error in errors) if errors else None,
)
result = dict(
failed=True,
exception=_messages.ErrorSummary(
event=event,
),
msg=_event_utils.format_event_brief_message(event),
)
return result

@ -0,0 +1,91 @@
from __future__ import annotations
import contextlib
import enum
import typing as t
from ansible.utils.display import Display
from ansible.constants import config
display = Display()
# FUTURE: add sanity test to detect use of skip_on_ignore without Skippable (and vice versa)
class ErrorAction(enum.Enum):
"""Action to take when an error is encountered."""
IGNORE = enum.auto()
WARNING = enum.auto()
ERROR = enum.auto()
@classmethod
def from_config(cls, setting: str, variables: dict[str, t.Any] | None = None) -> t.Self:
"""Return an `ErrorAction` enum from the specified Ansible config setting."""
return cls[config.get_config_value(setting, variables=variables).upper()]
class _SkipException(BaseException):
"""Internal flow control exception for skipping code blocks within a `Skippable` context manager."""
def __init__(self) -> None:
super().__init__('Skipping ignored action due to use of `skip_on_ignore`. It is a bug to encounter this message outside of debugging.')
class _SkippableContextManager:
"""Internal context manager to support flow control for skipping code blocks."""
def __enter__(self) -> None:
pass
def __exit__(self, exc_type, _exc_val, _exc_tb) -> bool:
if exc_type is None:
raise RuntimeError('A `Skippable` context manager was entered, but a `skip_on_ignore` handler was never invoked.')
return exc_type is _SkipException # only mask a _SkipException, allow all others to raise
Skippable = _SkippableContextManager()
"""Context manager singleton required to enclose `ErrorHandler.handle` invocations when `skip_on_ignore` is `True`."""
class ErrorHandler:
"""
Provides a configurable error handler context manager for a specific list of exception types.
Unhandled errors leaving the context manager can be ignored, treated as warnings, or allowed to raise by setting `ErrorAction`.
"""
def __init__(self, action: ErrorAction) -> None:
self.action = action
@contextlib.contextmanager
def handle(self, *args: type[BaseException], skip_on_ignore: bool = False) -> t.Iterator[None]:
"""
Handle the specified exception(s) using the defined error action.
If `skip_on_ignore` is `True`, the body of the context manager will be skipped for `ErrorAction.IGNORE`.
Use of `skip_on_ignore` requires enclosure within the `Skippable` context manager.
"""
if not args:
raise ValueError('At least one exception type is required.')
if skip_on_ignore and self.action == ErrorAction.IGNORE:
raise _SkipException() # skipping ignored action
try:
yield
except args as ex:
match self.action:
case ErrorAction.WARNING:
display.error_as_warning(msg=None, exception=ex)
case ErrorAction.ERROR:
raise
case _: # ErrorAction.IGNORE
pass
if skip_on_ignore:
raise _SkipException() # completed skippable action, ensures the `Skippable` context was used
@classmethod
def from_config(cls, setting: str, variables: dict[str, t.Any] | None = None) -> t.Self:
"""Return an `ErrorHandler` instance configured using the specified Ansible config setting."""
return cls(ErrorAction.from_config(setting, variables=variables))

@ -0,0 +1,28 @@
from __future__ import annotations
from collections import abc as _c
from ansible._internal._errors._alarm_timeout import AnsibleTimeoutError
from ansible._internal._errors._error_utils import ContributesToTaskResult
from ansible.module_utils.datatag import deprecate_value
class TaskTimeoutError(AnsibleTimeoutError, ContributesToTaskResult):
"""
A task-specific timeout.
This exception provides a result dictionary via the ContributesToTaskResult mixin.
"""
@property
def result_contribution(self) -> _c.Mapping[str, object]:
help_text = "Configure `DISPLAY_TRACEBACK` to see a traceback on timeout errors."
frame = deprecate_value(
value=help_text,
msg="The `timedout.frame` task result key is deprecated.",
help_text=help_text,
version="2.23",
)
return dict(timedout=dict(frame=frame, period=self.timeout))

@ -0,0 +1,127 @@
from __future__ import annotations as _annotations
import collections.abc as _c
import textwrap as _textwrap
from ansible.module_utils._internal import _event_utils, _messages
def format_event(event: _messages.Event, include_traceback: bool) -> str:
"""Format an event into a verbose message and traceback."""
msg = format_event_verbose_message(event)
if include_traceback:
msg += '\n' + format_event_traceback(event)
msg = msg.strip()
if '\n' in msg:
msg += '\n\n'
else:
msg += '\n'
return msg
def format_event_traceback(event: _messages.Event) -> str:
"""Format an event into a traceback."""
segments: list[str] = []
while event:
segment = event.formatted_traceback or '(traceback missing)\n'
if event.events:
child_tracebacks = [format_event_traceback(child) for child in event.events]
segment += _format_event_children("Sub-Traceback", child_tracebacks)
segments.append(segment)
if event.chain:
segments.append(f'\n{event.chain.traceback_reason}\n\n')
event = event.chain.event
else:
event = None
return ''.join(reversed(segments))
def format_event_verbose_message(event: _messages.Event) -> str:
"""
Format an event into a verbose message.
Help text, contextual information and sub-events will be included.
"""
segments: list[str] = []
original_event = event
while event:
messages = [event.msg]
chain: _messages.EventChain = event.chain
while chain and chain.follow:
if chain.event.events:
break # do not collapse a chained event with sub-events, since they would be lost
if chain.event.formatted_source_context or chain.event.help_text:
if chain.event.formatted_source_context != event.formatted_source_context or chain.event.help_text != event.help_text:
break # do not collapse a chained event with different details, since they would be lost
if chain.event.chain and chain.msg_reason != chain.event.chain.msg_reason:
break # do not collapse a chained event which has a chain with a different msg_reason
messages.append(chain.event.msg)
chain = chain.event.chain
msg = _event_utils.deduplicate_message_parts(messages)
segment = '\n'.join(_get_message_lines(msg, event.help_text, event.formatted_source_context)) + '\n'
if event.events:
child_msgs = [format_event_verbose_message(child) for child in event.events]
segment += _format_event_children("Sub-Event", child_msgs)
segments.append(segment)
if chain and chain.follow:
segments.append(f'\n{chain.msg_reason}\n\n')
event = chain.event
else:
event = None
if len(segments) > 1:
segments.insert(0, _event_utils.format_event_brief_message(original_event) + '\n\n')
return ''.join(segments)
def _format_event_children(label: str, children: _c.Iterable[str]) -> str:
"""Format the given list of child messages into a single string."""
items = list(children)
count = len(items)
lines = ['\n']
for idx, item in enumerate(items):
lines.append(f'+--[ {label} {idx + 1} of {count} ]---\n')
lines.append(_textwrap.indent(f"\n{item}\n", "| ", lambda value: True))
lines.append(f'+--[ End {label} ]---\n')
return ''.join(lines)
def _get_message_lines(message: str, help_text: str | None, formatted_source_context: str | None) -> list[str]:
"""Return a list of message lines constructed from the given message, help text and formatted source context."""
if help_text and not formatted_source_context and '\n' not in message and '\n' not in help_text:
return [f'{message} {help_text}'] # prefer a single-line message with help text when there is no source context
message_lines = [message]
if formatted_source_context:
message_lines.append(formatted_source_context)
if help_text:
message_lines.append('')
message_lines.append(help_text)
return message_lines

@ -0,0 +1,214 @@
"""Internal utilities for serialization and deserialization."""
# DTFIX-FUTURE: most of this isn't JSON specific, find a better home
from __future__ import annotations
import enum
import json
import typing as t
from ansible.errors import AnsibleVariableTypeError
from ansible.module_utils._internal._datatag import (
_ANSIBLE_ALLOWED_MAPPING_VAR_TYPES,
_ANSIBLE_ALLOWED_NON_SCALAR_COLLECTION_VAR_TYPES,
_ANSIBLE_ALLOWED_VAR_TYPES,
_AnsibleTaggedStr,
AnsibleTagHelper,
)
from ansible.module_utils._internal._json._profiles import _tagless
from ansible.parsing.vault import EncryptedString
from ansible._internal._datatag._tags import Origin, TrustedAsTemplate
from ansible._internal._templating import _transform
from ansible.module_utils import _internal
from ansible.module_utils._internal import _datatag
_T = t.TypeVar('_T')
_sentinel = object()
class HasCurrent(t.Protocol):
"""Utility protocol for mixin type safety."""
_current: t.Any
class StateTrackingMixIn(HasCurrent):
"""Mixin for use with `AnsibleVariableVisitor` to track current visitation context."""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._stack: list[t.Any] = []
def __enter__(self) -> None:
self._stack.append(self._current)
def __exit__(self, *_args, **_kwargs) -> None:
self._stack.pop()
def _get_stack(self) -> list[t.Any]:
if not self._stack:
return []
return self._stack[1:] + [self._current]
class EncryptedStringBehavior(enum.Enum):
"""How `AnsibleVariableVisitor` will handle instances of `EncryptedString`."""
PRESERVE = enum.auto()
"""Preserves the unmodified `EncryptedString` instance."""
DECRYPT = enum.auto()
"""Replaces the value with its decrypted plaintext."""
REDACT = enum.auto()
"""Replaces the value with a placeholder string."""
FAIL = enum.auto()
"""Raises an `AnsibleVariableTypeError` error."""
class AnsibleVariableVisitor:
"""Utility visitor base class to recursively apply various behaviors and checks to variable object graphs."""
def __init__(
self,
*,
trusted_as_template: bool = False,
origin: Origin | None = None,
convert_mapping_to_dict: bool = False,
convert_sequence_to_list: bool = False,
convert_custom_scalars: bool = False,
convert_to_native_values: bool = False,
apply_transforms: bool = False,
visit_keys: bool = False,
encrypted_string_behavior: EncryptedStringBehavior = EncryptedStringBehavior.DECRYPT,
):
super().__init__() # supports StateTrackingMixIn
self.trusted_as_template = trusted_as_template
self.origin = origin
self.convert_mapping_to_dict = convert_mapping_to_dict
self.convert_sequence_to_list = convert_sequence_to_list
self.convert_custom_scalars = convert_custom_scalars
self.convert_to_native_values = convert_to_native_values
self.apply_transforms = apply_transforms
self.visit_keys = visit_keys
self.encrypted_string_behavior = encrypted_string_behavior
if apply_transforms:
from ansible._internal._templating import _engine
self._template_engine = _engine.TemplateEngine()
else:
self._template_engine = None
self._current: t.Any = None # supports StateTrackingMixIn
def __enter__(self) -> t.Any:
"""No-op context manager dispatcher (delegates to mixin behavior if present)."""
if func := getattr(super(), '__enter__', None):
func()
def __exit__(self, *args, **kwargs) -> t.Any:
"""No-op context manager dispatcher (delegates to mixin behavior if present)."""
if func := getattr(super(), '__exit__', None):
func(*args, **kwargs)
def visit(self, value: _T) -> _T:
"""
Enforces Ansible's variable type system restrictions before a var is accepted in inventory. Also, conditionally implements template trust
compatibility, depending on the plugin's declared understanding (or lack thereof). This always recursively copies inputs to fully isolate
inventory data from what the plugin provided, and prevent any later mutation.
"""
return self._visit(None, value)
def _early_visit(self, value, value_type) -> t.Any:
"""Overridable hook point to allow custom string handling in derived visitors."""
if value_type in (str, _AnsibleTaggedStr):
# apply compatibility behavior
if self.trusted_as_template:
result = TrustedAsTemplate().tag(value)
else:
result = value
else:
result = _sentinel
return result
def _visit_key(self, key: t.Any) -> t.Any:
"""Internal implementation to recursively visit a key if visit_keys is enabled."""
if not self.visit_keys:
return key
return self._visit(None, key) # key=None prevents state tracking from seeing the key as value
def _visit(self, key: t.Any, value: _T) -> _T:
"""Internal implementation to recursively visit a data structure's contents."""
self._current = key # supports StateTrackingMixIn
value_type: type = type(value)
# handle EncryptedString conversion before more generic transformation and native conversions
if value_type is EncryptedString: # pylint: disable=unidiomatic-typecheck
match self.encrypted_string_behavior:
case EncryptedStringBehavior.DECRYPT:
value = str(value) # type: ignore[assignment]
value_type = str
case EncryptedStringBehavior.REDACT:
value = "<redacted>" # type: ignore[assignment]
value_type = str
case EncryptedStringBehavior.FAIL:
raise AnsibleVariableTypeError.from_value(obj=value)
elif self.apply_transforms and value_type in _transform._type_transform_mapping:
value = self._template_engine.transform(value)
value_type = type(value)
if self.convert_to_native_values and isinstance(value, _datatag.AnsibleTaggedObject):
value = value._native_copy()
value_type = type(value)
result: _T
# DTFIX-FUTURE: Visitor generally ignores dict/mapping keys by default except for debugging and schema-aware checking.
# It could be checking keys destined for variable storage to apply more strict rules about key shape and type.
if (result := self._early_visit(value, value_type)) is not _sentinel:
pass
# DTFIX7: de-duplicate and optimize; extract inline generator expressions and fallback function or mapping for native type calculation?
elif value_type in _ANSIBLE_ALLOWED_MAPPING_VAR_TYPES: # check mappings first, because they're also collections
with self: # supports StateTrackingMixIn
result = AnsibleTagHelper.tag_copy(value, ((self._visit_key(k), self._visit(k, v)) for k, v in value.items()), value_type=value_type)
elif value_type in _ANSIBLE_ALLOWED_NON_SCALAR_COLLECTION_VAR_TYPES:
with self: # supports StateTrackingMixIn
result = AnsibleTagHelper.tag_copy(value, (self._visit(k, v) for k, v in enumerate(t.cast(t.Iterable, value))), value_type=value_type)
elif self.convert_mapping_to_dict and _internal.is_intermediate_mapping(value):
with self: # supports StateTrackingMixIn
result = {self._visit_key(k): self._visit(k, v) for k, v in value.items()} # type: ignore[assignment]
elif self.convert_sequence_to_list and _internal.is_intermediate_iterable(value):
with self: # supports StateTrackingMixIn
result = [self._visit(k, v) for k, v in enumerate(t.cast(t.Iterable, value))] # type: ignore[assignment]
elif self.convert_custom_scalars and isinstance(value, str):
result = str(value) # type: ignore[assignment]
elif self.convert_custom_scalars and isinstance(value, float):
result = float(value) # type: ignore[assignment]
elif self.convert_custom_scalars and isinstance(value, int) and not isinstance(value, bool):
result = int(value) # type: ignore[assignment]
elif value_type in _ANSIBLE_ALLOWED_VAR_TYPES:
# supported scalar type that requires no special handling, just return as-is
result = value
elif self.encrypted_string_behavior is EncryptedStringBehavior.PRESERVE and isinstance(value, EncryptedString):
result = value # type: ignore[assignment]
else:
raise AnsibleVariableTypeError.from_value(obj=value)
if self.origin and not Origin.is_tagged_on(result):
# apply shared instance default origin tag
result = self.origin.tag(result)
return result
def json_dumps_formatted(value: object) -> str:
"""Return a JSON dump of `value` with formatting and keys sorted."""
return json.dumps(value, cls=_tagless.Encoder, sort_keys=True, indent=4)

@ -0,0 +1,34 @@
from __future__ import annotations as _annotations
import typing as _t
from ansible.module_utils._internal._json import _profiles
from ansible._internal._json._profiles import _legacy
from ansible.parsing import vault as _vault
class LegacyControllerJSONEncoder(_legacy.Encoder):
"""Compatibility wrapper over `legacy` profile JSON encoder to support trust stripping and vault value plaintext conversion."""
def __init__(self, preprocess_unsafe: bool = False, vault_to_text: bool = False, _decode_bytes: bool = False, **kwargs) -> None:
self._preprocess_unsafe = preprocess_unsafe
self._vault_to_text = vault_to_text
self._decode_bytes = _decode_bytes
super().__init__(**kwargs)
def default(self, o: _t.Any) -> _t.Any:
"""Hooked default that can conditionally bypass base encoder behavior based on this instance's config."""
if type(o) is _profiles._WrappedValue: # pylint: disable=unidiomatic-typecheck
o = o.wrapped
if not self._preprocess_unsafe and type(o) is _legacy._Untrusted: # pylint: disable=unidiomatic-typecheck
return o.value # if not emitting unsafe markers, bypass custom unsafe serialization and just return the raw value
if self._vault_to_text and type(o) is _vault.EncryptedString: # pylint: disable=unidiomatic-typecheck
return str(o) # decrypt and return the plaintext (or fail trying)
if self._decode_bytes and isinstance(o, bytes):
return o.decode(errors='surrogateescape') # backward compatibility with `ansible.module_utils.basic.jsonify`
return super().default(o)

@ -0,0 +1,57 @@
from __future__ import annotations
import datetime as _datetime
from ansible.module_utils._internal import _datatag
from ansible.module_utils._internal._json import _profiles
from ansible.parsing import vault as _vault
from ansible._internal._datatag import _tags
class _Profile(_profiles._JSONSerializationProfile):
"""Profile for external cache persistence of inventory/fact data that preserves most tags."""
serialize_map = {}
schema_id = 1
@classmethod
def post_init(cls, **kwargs):
cls.allowed_ansible_serializable_types = (
_profiles._common_module_types
| _profiles._common_module_response_types
| {
_datatag._AnsibleTaggedDate,
_datatag._AnsibleTaggedTime,
_datatag._AnsibleTaggedDateTime,
_datatag._AnsibleTaggedStr,
_datatag._AnsibleTaggedInt,
_datatag._AnsibleTaggedFloat,
_datatag._AnsibleTaggedList,
_datatag._AnsibleTaggedSet,
_datatag._AnsibleTaggedTuple,
_datatag._AnsibleTaggedDict,
_tags.SourceWasEncrypted,
_tags.Origin,
_tags.TrustedAsTemplate,
_vault.EncryptedString,
_vault.VaultedValue,
}
)
cls.serialize_map = {
set: cls.serialize_as_list,
tuple: cls.serialize_as_list,
_datetime.date: _datatag.AnsibleSerializableDate,
_datetime.time: _datatag.AnsibleSerializableTime,
_datetime.datetime: _datatag.AnsibleSerializableDateTime,
}
cls.handle_key = cls._handle_key_str_fallback # legacy stdlib-compatible key behavior
class Encoder(_profiles.AnsibleProfileJSONEncoder):
_profile = _Profile
class Decoder(_profiles.AnsibleProfileJSONDecoder):
_profile = _Profile

@ -0,0 +1,40 @@
"""
Backwards compatibility profile for serialization for persisted ansible-inventory output.
Behavior is equivalent to pre 2.18 `AnsibleJSONEncoder` with vault_to_text=True.
"""
from __future__ import annotations
from ... import _json
from . import _legacy
class _InventoryVariableVisitor(_legacy._LegacyVariableVisitor, _json.StateTrackingMixIn):
"""State-tracking visitor implementation that only applies trust to `_meta.hostvars` and `vars` inventory values."""
# DTFIX5: does the variable visitor need to support conversion of sequence/mapping for inventory?
@property
def _allow_trust(self) -> bool:
stack = self._get_stack()
if len(stack) >= 4 and stack[:2] == ['_meta', 'hostvars']:
return True
if len(stack) >= 3 and stack[1] == 'vars':
return True
return False
class _Profile(_legacy._Profile):
visitor_type = _InventoryVariableVisitor
encode_strings_as_utf8 = True
class Encoder(_legacy.Encoder):
_profile = _Profile
class Decoder(_legacy.Decoder):
_profile = _Profile

@ -0,0 +1,189 @@
"""
Backwards compatibility profile for serialization other than inventory (which should use inventory_legacy for backward-compatible trust behavior).
Behavior is equivalent to pre 2.19 `AnsibleJSONEncoder` with vault_to_text=True.
"""
from __future__ import annotations as _annotations
import datetime as _datetime
import typing as _t
from ansible._internal import _json
from ansible._internal._datatag import _tags
from ansible.module_utils._internal import _datatag
from ansible.module_utils._internal._json import _profiles
from ansible.parsing import vault as _vault
class _Untrusted:
"""
Temporarily wraps strings which are not trusted for templating.
Used before serialization of strings not tagged TrustedAsTemplate when trust inversion is enabled and trust is allowed in the string's context.
Used during deserialization of `__ansible_unsafe` strings to indicate they should not be tagged TrustedAsTemplate.
"""
__slots__ = ('value',)
def __init__(self, value: str) -> None:
self.value = value
class _LegacyVariableVisitor(_json.AnsibleVariableVisitor):
"""Variable visitor that supports optional trust inversion for legacy serialization."""
def __init__(
self,
*,
trusted_as_template: bool = False,
invert_trust: bool = False,
origin: _tags.Origin | None = None,
convert_mapping_to_dict: bool = False,
convert_sequence_to_list: bool = False,
convert_custom_scalars: bool = False,
):
super().__init__(
trusted_as_template=trusted_as_template,
origin=origin,
convert_mapping_to_dict=convert_mapping_to_dict,
convert_sequence_to_list=convert_sequence_to_list,
convert_custom_scalars=convert_custom_scalars,
encrypted_string_behavior=_json.EncryptedStringBehavior.PRESERVE,
)
self.invert_trust = invert_trust
if trusted_as_template and invert_trust:
raise ValueError('trusted_as_template is mutually exclusive with invert_trust')
@property
def _allow_trust(self) -> bool:
"""
This profile supports trust application in all contexts.
Derived implementations can override this behavior for application-dependent/schema-aware trust.
"""
return True
def _early_visit(self, value, value_type) -> _t.Any:
"""Similar to base implementation, but supports an intermediate wrapper for trust inversion."""
if value_type in (str, _datatag._AnsibleTaggedStr):
# apply compatibility behavior
if self.trusted_as_template and self._allow_trust:
result = _tags.TrustedAsTemplate().tag(value)
elif self.invert_trust and not _tags.TrustedAsTemplate.is_tagged_on(value) and self._allow_trust:
result = _Untrusted(value)
else:
result = value
elif value_type is _Untrusted:
result = value.value
else:
result = _json._sentinel
return result
class _Profile(_profiles._JSONSerializationProfile["Encoder", "Decoder"]):
visitor_type = _LegacyVariableVisitor
@classmethod
def serialize_untrusted(cls, value: _Untrusted) -> dict[str, str] | str:
return dict(
__ansible_unsafe=_datatag.AnsibleTagHelper.untag(value.value),
)
@classmethod
def serialize_tagged_str(cls, value: _datatag.AnsibleTaggedObject) -> _t.Any:
if ciphertext := _vault.VaultHelper.get_ciphertext(value, with_tags=False):
return dict(
__ansible_vault=ciphertext,
)
return _datatag.AnsibleTagHelper.untag(value)
@classmethod
def deserialize_unsafe(cls, value: dict[str, _t.Any]) -> _Untrusted:
ansible_unsafe = value['__ansible_unsafe']
if type(ansible_unsafe) is not str: # pylint: disable=unidiomatic-typecheck
raise TypeError(f"__ansible_unsafe is {type(ansible_unsafe)} not {str}")
return _Untrusted(ansible_unsafe)
@classmethod
def deserialize_vault(cls, value: dict[str, _t.Any]) -> _vault.EncryptedString:
ansible_vault = value['__ansible_vault']
if type(ansible_vault) is not str: # pylint: disable=unidiomatic-typecheck
raise TypeError(f"__ansible_vault is {type(ansible_vault)} not {str}")
encrypted_string = _vault.EncryptedString(ciphertext=ansible_vault)
return encrypted_string
@classmethod
def serialize_encrypted_string(cls, value: _vault.EncryptedString) -> dict[str, str]:
return dict(
__ansible_vault=_vault.VaultHelper.get_ciphertext(value, with_tags=False),
)
@classmethod
def post_init(cls) -> None:
cls.serialize_map = {
set: cls.serialize_as_list,
tuple: cls.serialize_as_list,
_datetime.date: cls.serialize_as_isoformat, # existing devel behavior
_datetime.time: cls.serialize_as_isoformat, # always failed pre-2.18, so okay to include for consistency
_datetime.datetime: cls.serialize_as_isoformat, # existing devel behavior
_datatag._AnsibleTaggedDate: cls.discard_tags,
_datatag._AnsibleTaggedTime: cls.discard_tags,
_datatag._AnsibleTaggedDateTime: cls.discard_tags,
_vault.EncryptedString: cls.serialize_encrypted_string,
_datatag._AnsibleTaggedStr: cls.serialize_tagged_str, # for VaultedValue tagged str
_datatag._AnsibleTaggedInt: cls.discard_tags,
_datatag._AnsibleTaggedFloat: cls.discard_tags,
_datatag._AnsibleTaggedList: cls.discard_tags,
_datatag._AnsibleTaggedSet: cls.discard_tags,
_datatag._AnsibleTaggedTuple: cls.discard_tags,
_datatag._AnsibleTaggedDict: cls.discard_tags,
_Untrusted: cls.serialize_untrusted, # equivalent to AnsibleJSONEncoder(preprocess_unsafe=True) in devel
}
cls.deserialize_map = {
'__ansible_unsafe': cls.deserialize_unsafe,
'__ansible_vault': cls.deserialize_vault,
}
cls.handle_key = cls._handle_key_str_fallback # type: ignore[method-assign] # legacy stdlib-compatible key behavior
@classmethod
def pre_serialize(cls, encoder: Encoder, o: _t.Any) -> _t.Any:
# DTFIX7: these conversion args probably aren't needed
avv = cls.visitor_type(invert_trust=True, convert_mapping_to_dict=True, convert_sequence_to_list=True, convert_custom_scalars=True)
return avv.visit(o)
@classmethod
def post_deserialize(cls, decoder: Decoder, o: _t.Any) -> _t.Any:
avv = cls.visitor_type(trusted_as_template=decoder._trusted_as_template, origin=decoder._origin)
return avv.visit(o)
class Encoder(_profiles.AnsibleProfileJSONEncoder):
_profile = _Profile
class Decoder(_profiles.AnsibleProfileJSONDecoder):
_profile = _Profile
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
# NB: these can only be sampled properly when loading strings, eg, `json.loads`; the global `json.load` function does not expose the file-like to us
self._origin: _tags.Origin | None = None
self._trusted_as_template: bool = False
def raw_decode(self, s: str, idx: int = 0) -> tuple[_t.Any, int]:
self._origin = _tags.Origin.get_tag(s)
self._trusted_as_template = _tags.TrustedAsTemplate.is_tagged_on(s)
return super().raw_decode(s, idx)

@ -0,0 +1,21 @@
from __future__ import annotations
import contextlib
import fcntl
import typing as t
@contextlib.contextmanager
def named_mutex(path: str) -> t.Iterator[None]:
"""
Lightweight context manager wrapper over `fcntl.flock` to provide IPC locking via a shared filename.
Entering the context manager blocks until the lock is acquired.
The lock file will be created automatically, but creation of the parent directory and deletion of the lockfile are the caller's responsibility.
"""
with open(path, 'a') as file:
fcntl.flock(file, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(file, fcntl.LOCK_UN)

@ -0,0 +1,57 @@
from __future__ import annotations
import functools
import json
import json.encoder
import json.decoder
import typing as t
from .._wrapt import ObjectProxy
from .._json._profiles import _cache_persistence
class PluginInterposer(ObjectProxy):
"""Proxies a Cache plugin instance to implement transparent encapsulation of serialized Ansible internal data types."""
_PAYLOAD_KEY = '__payload__'
"""The key used to store the serialized payload."""
def get(self, key: str) -> dict[str, object]:
return self._decode(self.__wrapped__.get(self._get_key(key)))
def set(self, key: str, value: dict[str, object]) -> None:
self.__wrapped__.set(self._get_key(key), self._encode(value))
def keys(self) -> t.Sequence[str]:
return [k for k in (self._restore_key(k) for k in self.__wrapped__.keys()) if k is not None]
def contains(self, key: t.Any) -> bool:
return self.__wrapped__.contains(self._get_key(key))
def delete(self, key: str) -> None:
self.__wrapped__.delete(self._get_key(key))
@classmethod
def _restore_key(cls, wrapped_key: str) -> str | None:
prefix = cls._get_wrapped_key_prefix()
if not wrapped_key.startswith(prefix):
return None
return wrapped_key[len(prefix) :]
@classmethod
@functools.cache
def _get_wrapped_key_prefix(cls) -> str:
return f's{_cache_persistence._Profile.schema_id}_'
@classmethod
def _get_key(cls, key: str) -> str:
"""Augment the supplied key with a schema identifier to allow for side-by-side caching across incompatible schemas."""
return f'{cls._get_wrapped_key_prefix()}{key}'
def _encode(self, value: dict[str, object]) -> dict[str, object]:
return {self._PAYLOAD_KEY: json.dumps(value, cls=_cache_persistence.Encoder)}
def _decode(self, value: dict[str, t.Any]) -> dict[str, object]:
return json.loads(value[self._PAYLOAD_KEY], cls=_cache_persistence.Decoder)

@ -0,0 +1,91 @@
from __future__ import annotations
import atexit
import os
import subprocess
from ansible import constants as C
from ansible._internal._errors import _alarm_timeout
from ansible._internal._ssh._ssh_agent import SshAgentClient
from ansible.cli import display
from ansible.errors import AnsibleError
from ansible.module_utils.common.process import get_bin_path
_SSH_AGENT_STDOUT_READ_TIMEOUT = 5 # seconds
def launch_ssh_agent() -> None:
"""If configured via `SSH_AGENT`, launch an ssh-agent for Ansible's use and/or verify access to an existing one."""
try:
_launch_ssh_agent()
except Exception as ex:
raise AnsibleError("Failed to launch ssh agent.") from ex
def _launch_ssh_agent() -> None:
ssh_agent_cfg = C.config.get_config_value('SSH_AGENT')
match ssh_agent_cfg:
case 'none':
display.debug('SSH_AGENT set to none')
return
case 'auto':
try:
ssh_agent_bin = get_bin_path(C.config.get_config_value('SSH_AGENT_EXECUTABLE'))
except ValueError as e:
raise AnsibleError('SSH_AGENT set to auto, but cannot find ssh-agent binary.') from e
ssh_agent_dir = os.path.join(C.DEFAULT_LOCAL_TMP, 'ssh_agent')
os.mkdir(ssh_agent_dir, 0o700)
sock = os.path.join(ssh_agent_dir, 'agent.sock')
display.vvv('SSH_AGENT: starting...')
try:
p = subprocess.Popen(
[ssh_agent_bin, '-D', '-s', '-a', sock],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except OSError as e:
raise AnsibleError('Could not start ssh-agent.') from e
atexit.register(p.terminate)
help_text = f'The ssh-agent {ssh_agent_bin!r} might be an incompatible agent.'
expected_stdout = 'SSH_AUTH_SOCK'
try:
with _alarm_timeout.AnsibleTimeoutError.alarm_timeout(_SSH_AGENT_STDOUT_READ_TIMEOUT):
stdout = p.stdout.read(len(expected_stdout))
except _alarm_timeout.AnsibleTimeoutError as e:
display.error_as_warning(
msg=f'Timed out waiting for expected stdout {expected_stdout!r} from ssh-agent.',
exception=e,
help_text=help_text,
)
else:
if stdout != expected_stdout:
display.warning(
msg=f'The ssh-agent output {stdout!r} did not match expected {expected_stdout!r}.',
help_text=help_text,
)
if p.poll() is not None:
raise AnsibleError(
message='The ssh-agent terminated prematurely.',
help_text=f'{help_text}\n\nReturn Code: {p.returncode}\nStandard Error:\n{p.stderr.read()}',
)
display.vvv(f'SSH_AGENT: ssh-agent[{p.pid}] started and bound to {sock}')
case _:
sock = ssh_agent_cfg
try:
with SshAgentClient(sock) as client:
client.list()
except Exception as e:
raise AnsibleError(f'Could not communicate with ssh-agent using auth sock {sock!r}.') from e
os.environ['SSH_AUTH_SOCK'] = os.environ['ANSIBLE_SSH_AGENT'] = sock

@ -0,0 +1,619 @@
# Copyright: Contributors to the Ansible project
# BSD 3 Clause License (see licenses/BSD-3-Clause.txt or https://opensource.org/license/bsd-3-clause/)
from __future__ import annotations
import binascii
import copy
import dataclasses
import enum
import functools
import hashlib
import socket
import types
import typing as t
try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.dsa import (
DSAParameterNumbers,
DSAPrivateKey,
DSAPublicKey,
DSAPublicNumbers,
)
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurve,
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
SECP256R1,
SECP384R1,
SECP521R1,
)
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey,
RSAPublicKey,
RSAPublicNumbers,
)
except ImportError:
HAS_CRYPTOGRAPHY = False
else:
HAS_CRYPTOGRAPHY = True
CryptoPublicKey = t.Union[
DSAPublicKey,
EllipticCurvePublicKey,
Ed25519PublicKey,
RSAPublicKey,
]
CryptoPrivateKey = t.Union[
DSAPrivateKey,
EllipticCurvePrivateKey,
Ed25519PrivateKey,
RSAPrivateKey,
]
if t.TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateNumbers
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateNumbers
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
_SSH_AGENT_CLIENT_SOCKET_TIMEOUT = 10
class ProtocolMsgNumbers(enum.IntEnum):
# Responses
SSH_AGENT_FAILURE = 5
SSH_AGENT_SUCCESS = 6
SSH_AGENT_IDENTITIES_ANSWER = 12
SSH_AGENT_SIGN_RESPONSE = 14
SSH_AGENT_EXTENSION_FAILURE = 28
SSH_AGENT_EXTENSION_RESPONSE = 29
# Constraints
SSH_AGENT_CONSTRAIN_LIFETIME = 1
SSH_AGENT_CONSTRAIN_CONFIRM = 2
SSH_AGENT_CONSTRAIN_EXTENSION = 255
# Requests
SSH_AGENTC_REQUEST_IDENTITIES = 11
SSH_AGENTC_SIGN_REQUEST = 13
SSH_AGENTC_ADD_IDENTITY = 17
SSH_AGENTC_REMOVE_IDENTITY = 18
SSH_AGENTC_REMOVE_ALL_IDENTITIES = 19
SSH_AGENTC_ADD_SMARTCARD_KEY = 20
SSH_AGENTC_REMOVE_SMARTCARD_KEY = 21
SSH_AGENTC_LOCK = 22
SSH_AGENTC_UNLOCK = 23
SSH_AGENTC_ADD_ID_CONSTRAINED = 25
SSH_AGENTC_ADD_SMARTCARD_KEY_CONSTRAINED = 26
SSH_AGENTC_EXTENSION = 27
def to_blob(self) -> bytes:
return bytes([self])
class SshAgentFailure(RuntimeError):
"""Server failure or unexpected response."""
# NOTE: Classes below somewhat represent "Data Type Representations Used in the SSH Protocols"
# as specified by RFC4251
@t.runtime_checkable
class SupportsToBlob(t.Protocol):
def to_blob(self) -> bytes: ...
@t.runtime_checkable
class SupportsFromBlob(t.Protocol):
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self: ...
@classmethod
def consume_from_blob(cls, blob: memoryview | bytes) -> tuple[t.Self, memoryview | bytes]: ...
def _split_blob(blob: memoryview | bytes, length: int) -> tuple[memoryview | bytes, memoryview | bytes]:
if len(blob) < length:
raise ValueError("_split_blob: unexpected data length")
return blob[:length], blob[length:]
class VariableSized:
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
raise NotImplementedError
@classmethod
def consume_from_blob(cls, blob: memoryview | bytes) -> tuple[t.Self, memoryview | bytes]:
length = uint32.from_blob(blob[:4])
blob = blob[4:]
data, rest = _split_blob(blob, length)
return cls.from_blob(data), rest
class uint32(int):
def to_blob(self) -> bytes:
return self.to_bytes(length=4, byteorder='big')
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
return cls.from_bytes(blob, byteorder='big')
@classmethod
def consume_from_blob(cls, blob: memoryview | bytes) -> tuple[t.Self, memoryview | bytes]:
length = uint32(4)
data, rest = _split_blob(blob, length)
return cls.from_blob(data), rest
class mpint(int, VariableSized):
def to_blob(self) -> bytes:
if self < 0:
raise ValueError("negative mpint not allowed")
if not self:
return b""
nbytes = (self.bit_length() + 8) // 8
ret = bytearray(self.to_bytes(length=nbytes, byteorder='big'))
ret[:0] = uint32(len(ret)).to_blob()
return ret
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
if blob and blob[0] > 127:
raise ValueError("Invalid data")
return cls.from_bytes(blob, byteorder='big')
class constraints(bytes):
def to_blob(self) -> bytes:
return self
class binary_string(bytes, VariableSized):
def to_blob(self) -> bytes:
if length := len(self):
return uint32(length).to_blob() + self
else:
return b""
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
return cls(blob)
class unicode_string(str, VariableSized):
def to_blob(self) -> bytes:
val = self.encode('utf-8')
if length := len(val):
return uint32(length).to_blob() + val
else:
return b""
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
return cls(bytes(blob).decode('utf-8'))
class KeyAlgo(str, VariableSized, enum.Enum):
RSA = "ssh-rsa"
DSA = "ssh-dss"
ECDSA256 = "ecdsa-sha2-nistp256"
SKECDSA256 = "sk-ecdsa-sha2-nistp256@openssh.com"
ECDSA384 = "ecdsa-sha2-nistp384"
ECDSA521 = "ecdsa-sha2-nistp521"
ED25519 = "ssh-ed25519"
SKED25519 = "sk-ssh-ed25519@openssh.com"
RSASHA256 = "rsa-sha2-256"
RSASHA512 = "rsa-sha2-512"
@property
def main_type(self) -> str:
match self:
case self.RSA:
return 'RSA'
case self.DSA:
return 'DSA'
case self.ECDSA256 | self.ECDSA384 | self.ECDSA521:
return 'ECDSA'
case self.ED25519:
return 'ED25519'
case _:
raise NotImplementedError(self.name)
def to_blob(self) -> bytes:
b_self = self.encode('utf-8')
return uint32(len(b_self)).to_blob() + b_self
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
return cls(bytes(blob).decode('utf-8'))
if HAS_CRYPTOGRAPHY:
_ECDSA_KEY_TYPE: dict[KeyAlgo, type[EllipticCurve]] = {
KeyAlgo.ECDSA256: SECP256R1,
KeyAlgo.ECDSA384: SECP384R1,
KeyAlgo.ECDSA521: SECP521R1,
}
@dataclasses.dataclass
class Msg:
def to_blob(self) -> bytes:
rv = bytearray()
for field in dataclasses.fields(self):
fv = getattr(self, field.name)
if isinstance(fv, SupportsToBlob):
rv.extend(fv.to_blob())
else:
raise NotImplementedError(field.type)
return rv
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self:
args: list[t.Any] = []
for _field_name, field_type in t.get_type_hints(cls).items():
if isinstance(field_type, SupportsFromBlob):
fv, blob = field_type.consume_from_blob(blob)
args.append(fv)
else:
raise NotImplementedError(str(field_type))
return cls(*args)
@dataclasses.dataclass
class PrivateKeyMsg(Msg):
@staticmethod
def from_private_key(private_key: CryptoPrivateKey) -> PrivateKeyMsg:
match private_key:
case RSAPrivateKey():
rsa_pn: RSAPrivateNumbers = private_key.private_numbers()
return RSAPrivateKeyMsg(
KeyAlgo.RSA,
mpint(rsa_pn.public_numbers.n),
mpint(rsa_pn.public_numbers.e),
mpint(rsa_pn.d),
mpint(rsa_pn.iqmp),
mpint(rsa_pn.p),
mpint(rsa_pn.q),
)
case DSAPrivateKey():
dsa_pn: DSAPrivateNumbers = private_key.private_numbers()
return DSAPrivateKeyMsg(
KeyAlgo.DSA,
mpint(dsa_pn.public_numbers.parameter_numbers.p),
mpint(dsa_pn.public_numbers.parameter_numbers.q),
mpint(dsa_pn.public_numbers.parameter_numbers.g),
mpint(dsa_pn.public_numbers.y),
mpint(dsa_pn.x),
)
case EllipticCurvePrivateKey():
ecdsa_pn: EllipticCurvePrivateNumbers = private_key.private_numbers()
key_size = private_key.key_size
return EcdsaPrivateKeyMsg(
getattr(KeyAlgo, f'ECDSA{key_size}'),
unicode_string(f'nistp{key_size}'),
binary_string(
private_key.public_key().public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
),
mpint(ecdsa_pn.private_value),
)
case Ed25519PrivateKey():
public_bytes = private_key.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
return Ed25519PrivateKeyMsg(
KeyAlgo.ED25519,
binary_string(public_bytes),
binary_string(private_bytes + public_bytes),
)
case _:
raise NotImplementedError(private_key)
@dataclasses.dataclass(order=True, slots=True)
class RSAPrivateKeyMsg(PrivateKeyMsg):
type: KeyAlgo
n: mpint
e: mpint
d: mpint
iqmp: mpint
p: mpint
q: mpint
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
constraints: constraints = dataclasses.field(default=constraints(b''))
@dataclasses.dataclass(order=True, slots=True)
class DSAPrivateKeyMsg(PrivateKeyMsg):
type: KeyAlgo
p: mpint
q: mpint
g: mpint
y: mpint
x: mpint
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
constraints: constraints = dataclasses.field(default=constraints(b''))
@dataclasses.dataclass(order=True, slots=True)
class EcdsaPrivateKeyMsg(PrivateKeyMsg):
type: KeyAlgo
ecdsa_curve_name: unicode_string
Q: binary_string
d: mpint
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
constraints: constraints = dataclasses.field(default=constraints(b''))
@dataclasses.dataclass(order=True, slots=True)
class Ed25519PrivateKeyMsg(PrivateKeyMsg):
type: KeyAlgo
enc_a: binary_string
k_env_a: binary_string
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
constraints: constraints = dataclasses.field(default=constraints(b''))
@dataclasses.dataclass
class PublicKeyMsg(Msg):
@staticmethod
def get_dataclass(type: KeyAlgo) -> type[
t.Union[
RSAPublicKeyMsg,
EcdsaPublicKeyMsg,
Ed25519PublicKeyMsg,
DSAPublicKeyMsg,
]
]:
match type:
case KeyAlgo.RSA:
return RSAPublicKeyMsg
case KeyAlgo.ECDSA256 | KeyAlgo.ECDSA384 | KeyAlgo.ECDSA521:
return EcdsaPublicKeyMsg
case KeyAlgo.ED25519:
return Ed25519PublicKeyMsg
case KeyAlgo.DSA:
return DSAPublicKeyMsg
case _:
raise NotImplementedError(type)
@functools.cached_property
def public_key(self) -> CryptoPublicKey:
type: KeyAlgo = self.type
match type:
case KeyAlgo.RSA:
return RSAPublicNumbers(self.e, self.n).public_key()
case KeyAlgo.ECDSA256 | KeyAlgo.ECDSA384 | KeyAlgo.ECDSA521:
curve = _ECDSA_KEY_TYPE[KeyAlgo(type)]
return EllipticCurvePublicKey.from_encoded_point(curve(), self.Q)
case KeyAlgo.ED25519:
return Ed25519PublicKey.from_public_bytes(self.enc_a)
case KeyAlgo.DSA:
return DSAPublicNumbers(self.y, DSAParameterNumbers(self.p, self.q, self.g)).public_key()
case _:
raise NotImplementedError(type)
@staticmethod
def from_public_key(public_key: CryptoPublicKey) -> PublicKeyMsg:
match public_key:
case DSAPublicKey():
dsa_pn: DSAPublicNumbers = public_key.public_numbers()
return DSAPublicKeyMsg(
KeyAlgo.DSA,
mpint(dsa_pn.parameter_numbers.p),
mpint(dsa_pn.parameter_numbers.q),
mpint(dsa_pn.parameter_numbers.g),
mpint(dsa_pn.y),
)
case EllipticCurvePublicKey():
return EcdsaPublicKeyMsg(
getattr(KeyAlgo, f'ECDSA{public_key.curve.key_size}'),
unicode_string(f'nistp{public_key.curve.key_size}'),
binary_string(
public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
),
)
case Ed25519PublicKey():
return Ed25519PublicKeyMsg(
KeyAlgo.ED25519,
binary_string(
public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
),
)
case RSAPublicKey():
rsa_pn: RSAPublicNumbers = public_key.public_numbers()
return RSAPublicKeyMsg(KeyAlgo.RSA, mpint(rsa_pn.e), mpint(rsa_pn.n))
case _:
raise NotImplementedError(public_key)
@functools.cached_property
def fingerprint(self) -> str:
digest = hashlib.sha256()
msg = copy.copy(self)
msg.comments = unicode_string('')
k = msg.to_blob()
digest.update(k)
return binascii.b2a_base64(digest.digest(), newline=False).rstrip(b'=').decode('utf-8')
@dataclasses.dataclass(order=True, slots=True)
class RSAPublicKeyMsg(PublicKeyMsg):
type: KeyAlgo
e: mpint
n: mpint
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
@dataclasses.dataclass(order=True, slots=True)
class DSAPublicKeyMsg(PublicKeyMsg):
type: KeyAlgo
p: mpint
q: mpint
g: mpint
y: mpint
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
@dataclasses.dataclass(order=True, slots=True)
class EcdsaPublicKeyMsg(PublicKeyMsg):
type: KeyAlgo
ecdsa_curve_name: unicode_string
Q: binary_string
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
@dataclasses.dataclass(order=True, slots=True)
class Ed25519PublicKeyMsg(PublicKeyMsg):
type: KeyAlgo
enc_a: binary_string
comments: unicode_string = dataclasses.field(default=unicode_string(''), compare=False)
@dataclasses.dataclass(order=True, slots=True)
class KeyList(Msg):
nkeys: uint32
keys: PublicKeyMsgList
def __post_init__(self) -> None:
if self.nkeys != len(self.keys):
raise SshAgentFailure("agent: invalid number of keys received for identities list")
@dataclasses.dataclass(order=True, slots=True)
class PublicKeyMsgList(Msg):
keys: list[PublicKeyMsg]
def __iter__(self) -> t.Iterator[PublicKeyMsg]:
yield from self.keys
def __len__(self) -> int:
return len(self.keys)
@classmethod
def from_blob(cls, blob: memoryview | bytes) -> t.Self: ...
@classmethod
def consume_from_blob(cls, blob: memoryview | bytes) -> tuple[t.Self, memoryview | bytes]:
args: list[PublicKeyMsg] = []
while blob:
prev_blob = blob
key_blob, key_blob_length, comment_blob = cls._consume_field(blob)
peek_key_algo, _length, _blob = cls._consume_field(key_blob)
pub_key_msg_cls = PublicKeyMsg.get_dataclass(KeyAlgo(bytes(peek_key_algo).decode('utf-8')))
_fv, comment_blob_length, blob = cls._consume_field(comment_blob)
key_plus_comment = prev_blob[4 : (4 + key_blob_length) + (4 + comment_blob_length)]
args.append(pub_key_msg_cls.from_blob(key_plus_comment))
return cls(args), b""
@staticmethod
def _consume_field(blob: memoryview | bytes) -> tuple[memoryview | bytes, uint32, memoryview | bytes]:
length = uint32.from_blob(blob[:4])
blob = blob[4:]
data, rest = _split_blob(blob, length)
return data, length, rest
class SshAgentClient:
def __init__(self, auth_sock: str) -> None:
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.settimeout(_SSH_AGENT_CLIENT_SOCKET_TIMEOUT)
self._sock.connect(auth_sock)
def close(self) -> None:
self._sock.close()
def __enter__(self) -> t.Self:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
self.close()
def send(self, msg: bytes) -> bytes:
length = uint32(len(msg)).to_blob()
self._sock.sendall(length + msg)
bufsize = uint32.from_blob(self._sock.recv(4))
resp = self._sock.recv(bufsize)
if resp[0] == ProtocolMsgNumbers.SSH_AGENT_FAILURE:
raise SshAgentFailure('agent: failure')
return resp
def remove_all(self) -> None:
self.send(ProtocolMsgNumbers.SSH_AGENTC_REMOVE_ALL_IDENTITIES.to_blob())
def remove(self, public_key: CryptoPublicKey) -> None:
key_blob = PublicKeyMsg.from_public_key(public_key).to_blob()
self.send(ProtocolMsgNumbers.SSH_AGENTC_REMOVE_IDENTITY.to_blob() + uint32(len(key_blob)).to_blob() + key_blob)
def add(
self,
private_key: CryptoPrivateKey,
comments: str | None = None,
lifetime: int | None = None,
confirm: bool | None = None,
) -> None:
key_msg = PrivateKeyMsg.from_private_key(private_key)
key_msg.comments = unicode_string(comments or '')
if lifetime:
key_msg.constraints += constraints([ProtocolMsgNumbers.SSH_AGENT_CONSTRAIN_LIFETIME]).to_blob() + uint32(lifetime).to_blob()
if confirm:
key_msg.constraints += constraints([ProtocolMsgNumbers.SSH_AGENT_CONSTRAIN_CONFIRM]).to_blob()
if key_msg.constraints:
msg = ProtocolMsgNumbers.SSH_AGENTC_ADD_ID_CONSTRAINED.to_blob()
else:
msg = ProtocolMsgNumbers.SSH_AGENTC_ADD_IDENTITY.to_blob()
msg += key_msg.to_blob()
self.send(msg)
def list(self) -> KeyList:
req = ProtocolMsgNumbers.SSH_AGENTC_REQUEST_IDENTITIES.to_blob()
r = memoryview(bytearray(self.send(req)))
if r[0] != ProtocolMsgNumbers.SSH_AGENT_IDENTITIES_ANSWER:
raise SshAgentFailure('agent: non-identities answer received for identities list')
return KeyList.from_blob(r[1:])
def __contains__(self, public_key: CryptoPublicKey) -> bool:
msg = PublicKeyMsg.from_public_key(public_key)
return msg in self.list().keys
@functools.cache
def key_data_into_crypto_objects(key_data: bytes, passphrase: bytes | None) -> tuple[CryptoPrivateKey, CryptoPublicKey, str]:
private_key = serialization.ssh.load_ssh_private_key(key_data, passphrase)
public_key = private_key.public_key()
fingerprint = PublicKeyMsg.from_public_key(public_key).fingerprint
return private_key, public_key, fingerprint

@ -0,0 +1,78 @@
from __future__ import annotations
import dataclasses
import typing as t
from collections import abc as c
from ansible import constants
from ansible._internal._templating import _engine
from ansible._internal._templating._chain_templar import ChainTemplar
from ansible.errors import AnsibleError
from ansible.module_utils._internal._ambient_context import AmbientContextBase
from ansible.module_utils.datatag import native_type_name
from ansible.parsing import vault as _vault
from ansible.utils.display import Display
if t.TYPE_CHECKING:
from ansible.playbook.task import Task
@dataclasses.dataclass
class TaskContext(AmbientContextBase):
"""Ambient context that wraps task execution on workers. It provides access to the currently executing task."""
task: Task
TaskArgsFinalizerCallback = t.Callable[[str, t.Any, _engine.TemplateEngine, t.Any], t.Any]
"""Type alias for the shape of the `ActionBase.finalize_task_arg` method."""
class TaskArgsChainTemplar(ChainTemplar):
"""
A ChainTemplar that carries a user-provided context object, optionally provided by `ActionBase.get_finalize_task_args_context`.
TaskArgsFinalizer provides the context to each `ActionBase.finalize_task_arg` call to allow for more complex/stateful customization.
"""
def __init__(self, *sources: c.Mapping, templar: _engine.TemplateEngine, callback: TaskArgsFinalizerCallback, context: t.Any) -> None:
super().__init__(*sources, templar=templar)
self.callback = callback
self.context = context
def template(self, key: t.Any, value: t.Any) -> t.Any:
return self.callback(key, value, self.templar, self.context)
class TaskArgsFinalizer:
"""Invoked during task args finalization; allows actions to override default arg processing (e.g., templating)."""
def __init__(self, *args: c.Mapping[str, t.Any] | str | None, templar: _engine.TemplateEngine) -> None:
self._args_layers = [arg for arg in args if arg is not None]
self._templar = templar
def finalize(self, callback: TaskArgsFinalizerCallback, context: t.Any) -> dict[str, t.Any]:
resolved_layers: list[c.Mapping[str, t.Any]] = []
for layer in self._args_layers:
if isinstance(layer, (str, _vault.EncryptedString)): # EncryptedString can hide a template
if constants.config.get_config_value('INJECT_FACTS_AS_VARS'):
Display().warning(
"Using a template for task args is unsafe in some situations "
"(see https://docs.ansible.com/ansible/devel/reference_appendices/faq.html#argsplat-unsafe).",
obj=layer,
)
resolved_layer = self._templar.resolve_to_container(layer, options=_engine.TemplateOptions(value_for_omit={}))
else:
resolved_layer = layer
if not isinstance(resolved_layer, dict):
raise AnsibleError(f'Task args must resolve to a {native_type_name(dict)!r} not {native_type_name(resolved_layer)!r}.', obj=layer)
resolved_layers.append(resolved_layer)
ct = TaskArgsChainTemplar(*reversed(resolved_layers), templar=self._templar, callback=callback, context=context)
return ct.as_dict()

@ -0,0 +1,12 @@
from __future__ import annotations
import importlib.metadata
jinja2_version = importlib.metadata.version('jinja2')
# DTFIX-FUTURE: sanity test to ensure this doesn't drift from requirements
_MINIMUM_JINJA_VERSION = (3, 1)
_CURRENT_JINJA_VERSION = tuple(map(int, jinja2_version.split('.', maxsplit=2)[:2]))
if _CURRENT_JINJA_VERSION < _MINIMUM_JINJA_VERSION:
raise RuntimeError(f'Jinja version {".".join(map(str, _MINIMUM_JINJA_VERSION))} or higher is required (current version {jinja2_version}).')

@ -0,0 +1,86 @@
from __future__ import annotations
import abc
import typing as t
from contextvars import ContextVar
from ansible.module_utils._internal._datatag import AnsibleTagHelper
class NotifiableAccessContextBase(metaclass=abc.ABCMeta):
"""Base class for a context manager that, when active, receives notification of managed access for types/tags in which it has registered an interest."""
_type_interest: t.FrozenSet[type] = frozenset()
"""Set of types (including tag types) for which this context will be notified upon access."""
_mask: t.ClassVar[bool] = False
"""When true, only the innermost (most recently created) context of this type will be notified."""
def __enter__(self):
# noinspection PyProtectedMember
AnsibleAccessContext.current()._register_interest(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
# noinspection PyProtectedMember
AnsibleAccessContext.current()._unregister_interest(self)
return None
@abc.abstractmethod
def _notify(self, o: t.Any) -> t.Any:
"""Derived classes implement custom notification behavior when a registered type or tag is accessed."""
class AnsibleAccessContext:
"""
Broker object for managed access registration and notification.
Each thread or other logical callstack has a dedicated `AnsibleAccessContext` object with which `NotifiableAccessContext` objects can register interest.
When a managed access occurs on an object, each active `NotifiableAccessContext` within the current callstack that has registered interest in that
object's type or a tag present on it will be notified.
"""
_contextvar: t.ClassVar[ContextVar[AnsibleAccessContext]] = ContextVar('AnsibleAccessContext')
@staticmethod
def current() -> AnsibleAccessContext:
"""Creates or retrieves an `AnsibleAccessContext` for the current logical callstack."""
try:
ctx: AnsibleAccessContext = AnsibleAccessContext._contextvar.get()
except LookupError:
# didn't exist; create it
ctx = AnsibleAccessContext()
AnsibleAccessContext._contextvar.set(ctx) # we ignore the token, since this should live for the life of the thread/async ctx
return ctx
def __init__(self) -> None:
self._notify_contexts: list[NotifiableAccessContextBase] = []
def _register_interest(self, context: NotifiableAccessContextBase) -> None:
self._notify_contexts.append(context)
def _unregister_interest(self, context: NotifiableAccessContextBase) -> None:
ctx = self._notify_contexts.pop()
if ctx is not context:
raise RuntimeError(f'Out-of-order context deactivation detected. Found {ctx} instead of {context}.')
def access(self, value: t.Any) -> None:
"""Notify all contexts which have registered interest in the given value that it is being accessed."""
if not self._notify_contexts:
return
value_types = AnsibleTagHelper.tag_types(value) | frozenset((type(value),))
masked: set[type] = set()
for ctx in reversed(self._notify_contexts):
if ctx._mask:
if (ctx_type := type(ctx)) in masked:
continue
masked.add(ctx_type)
# noinspection PyProtectedMember
if ctx._type_interest.intersection(value_types):
ctx._notify(value)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save