ansible-test - More PEP 484 type hints.

pull/75783/head
Matt Clay 3 years ago
parent 6981237130
commit b70077364a

@ -53,13 +53,8 @@ from ..data import (
FOCUSED_TARGET = '__focused__'
def categorize_changes(args, paths, verbose_command=None):
"""
:type args: TestConfig
:type paths: list[str]
:type verbose_command: str
:rtype: ChangeDescription
"""
def categorize_changes(args, paths, verbose_command=None): # type: (TestConfig, t.List[str], t.Optional[str]) -> ChangeDescription
"""Categorize the given list of changed paths and return a description of the changes."""
mapper = PathMapper(args)
commands = {
@ -180,10 +175,7 @@ def categorize_changes(args, paths, verbose_command=None):
class PathMapper:
"""Map file paths to test commands and targets."""
def __init__(self, args):
"""
:type args: TestConfig
"""
def __init__(self, args): # type: (TestConfig) -> None
self.args = args
self.integration_all_target = get_integration_all_target(self.args)
@ -226,11 +218,8 @@ class PathMapper:
self.paths_to_dependent_targets[path].add(target)
def get_dependent_paths(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_dependent_paths(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path, recursively expanding dependent paths as well."""
unprocessed_paths = set(self.get_dependent_paths_non_recursive(path))
paths = set()
@ -248,22 +237,16 @@ class PathMapper:
return sorted(paths)
def get_dependent_paths_non_recursive(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_dependent_paths_non_recursive(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path, including dependent integration test target paths."""
paths = self.get_dependent_paths_internal(path)
paths += [target.path + '/' for target in self.paths_to_dependent_targets.get(path, set())]
paths = sorted(set(paths))
return paths
def get_dependent_paths_internal(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_dependent_paths_internal(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path."""
ext = os.path.splitext(os.path.split(path)[1])[1]
if is_subdir(path, data_context().content.module_utils_path):
@ -281,11 +264,8 @@ class PathMapper:
return []
def get_python_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_python_module_utils_usage(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path which is a Python module_utils file."""
if not self.python_module_utils_imports:
display.info('Analyzing python module_utils imports...')
before = time.time()
@ -297,11 +277,8 @@ class PathMapper:
return sorted(self.python_module_utils_imports[name])
def get_powershell_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_powershell_module_utils_usage(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path which is a PowerShell module_utils file."""
if not self.powershell_module_utils_imports:
display.info('Analyzing powershell module_utils imports...')
before = time.time()
@ -313,11 +290,8 @@ class PathMapper:
return sorted(self.powershell_module_utils_imports[name])
def get_csharp_module_utils_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_csharp_module_utils_usage(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path which is a C# module_utils file."""
if not self.csharp_module_utils_imports:
display.info('Analyzing C# module_utils imports...')
before = time.time()
@ -329,22 +303,16 @@ class PathMapper:
return sorted(self.csharp_module_utils_imports[name])
def get_integration_target_usage(self, path):
"""
:type path: str
:rtype: list[str]
"""
def get_integration_target_usage(self, path): # type: (str) -> t.List[str]
"""Return a list of paths which depend on the given path which is an integration target file."""
target_name = path.split('/')[3]
dependents = [os.path.join(data_context().content.integration_targets_path, target) + os.path.sep
for target in sorted(self.integration_dependencies.get(target_name, set()))]
return dependents
def classify(self, path):
"""
:type path: str
:rtype: dict[str, str] | None
"""
def classify(self, path): # type: (str) -> t.Optional[t.Dict[str, str]]
"""Classify the given path and return an optional dictionary of the results."""
result = self._classify(path)
# run all tests when no result given
@ -908,12 +876,8 @@ class PathMapper:
)
def all_tests(args, force=False):
"""
:type args: TestConfig
:type force: bool
:rtype: dict[str, str]
"""
def all_tests(args, force=False): # type: (TestConfig, bool) -> t.Dict[str, str]
"""Return the targets for each test command when all tests should be run."""
if force:
integration_all_target = 'all'
else:
@ -928,11 +892,8 @@ def all_tests(args, force=False):
}
def get_integration_all_target(args):
"""
:type args: TestConfig
:rtype: str
"""
def get_integration_all_target(args): # type: (TestConfig) -> str
"""Return the target to use when all tests should be run."""
if isinstance(args, IntegrationConfig):
return args.changed_all_target

@ -129,11 +129,8 @@ from .coverage import (
THostProfile = t.TypeVar('THostProfile', bound=HostProfile)
def generate_dependency_map(integration_targets):
"""
:type integration_targets: list[IntegrationTarget]
:rtype: dict[str, set[IntegrationTarget]]
"""
def generate_dependency_map(integration_targets): # type: (t.List[IntegrationTarget]) -> t.Dict[str, t.Set[IntegrationTarget]]
"""Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend."""
targets_dict = dict((target.name, target) for target in integration_targets)
target_dependencies = analyze_integration_target_dependencies(integration_targets)
dependency_map = {}
@ -159,11 +156,8 @@ def generate_dependency_map(integration_targets):
return dependency_map
def get_files_needed(target_dependencies):
"""
:type target_dependencies: list[IntegrationTarget]
:rtype: list[str]
"""
def get_files_needed(target_dependencies): # type: (t.List[IntegrationTarget]) -> t.List[str]
"""Return a list of files needed by the given list of target dependencies."""
files_needed = []
for target_dependency in target_dependencies:
@ -230,12 +224,12 @@ def delegate_inventory(args, inventory_path_src): # type: (IntegrationConfig, s
@contextlib.contextmanager
def integration_test_environment(args, target, inventory_path_src):
"""
:type args: IntegrationConfig
:type target: IntegrationTarget
:type inventory_path_src: str
"""
def integration_test_environment(
args, # type: IntegrationConfig
target, # type: IntegrationTarget
inventory_path_src, # type: str
): # type: (...) -> t.ContextManager[IntegrationEnvironment]
"""Context manager that prepares the integration test environment and cleans it up."""
ansible_config_src = args.get_ansible_config()
ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command)
@ -333,12 +327,12 @@ def integration_test_environment(args, target, inventory_path_src):
@contextlib.contextmanager
def integration_test_config_file(args, env_config, integration_dir):
"""
:type args: IntegrationConfig
:type env_config: CloudEnvironmentConfig
:type integration_dir: str
"""
def integration_test_config_file(
args, # type: IntegrationConfig
env_config, # type: CloudEnvironmentConfig
integration_dir, # type: str
): # type: (...) -> t.ContextManager[t.Optional[str]]
"""Context manager that provides a config file for integration tests, if needed."""
if not env_config:
yield None
return
@ -764,16 +758,15 @@ def run_setup_targets(
targets_executed.add(target_name)
def integration_environment(args, target, test_dir, inventory_path, ansible_config, env_config):
"""
:type args: IntegrationConfig
:type target: IntegrationTarget
:type test_dir: str
:type inventory_path: str
:type ansible_config: str | None
:type env_config: CloudEnvironmentConfig | None
:rtype: dict[str, str]
"""
def integration_environment(
args, # type: IntegrationConfig
target, # type: IntegrationTarget
test_dir, # type: str
inventory_path, # type: str
ansible_config, # type: t.Optional[str]
env_config, # type: t.Optional[CloudEnvironmentConfig]
): # type: (...) -> t.Dict[str, str]
"""Return a dictionary of environment variables to use when running the given integration test target."""
env = ansible_environment(args, ansible_config=ansible_config)
callback_plugins = ['junit'] + (env_config.callback_plugins or [] if env_config else [])

@ -52,16 +52,16 @@ class AnsibleCoreCI:
"""Client for Ansible Core CI services."""
DEFAULT_ENDPOINT = 'https://ansible-core-ci.testing.ansible.com'
def __init__(self, args, platform, version, provider, persist=True, load=True, suffix=None):
"""
:type args: EnvironmentConfig
:type platform: str
:type version: str
:type provider: str
:type persist: bool
:type load: bool
:type suffix: str | None
"""
def __init__(
self,
args, # type: EnvironmentConfig
platform, # type: str
version, # type: str
provider, # type: str
persist=True, # type: bool
load=True, # type: bool
suffix=None, # type: t.Optional[str]
): # type: (...) -> None
self.args = args
self.platform = platform
self.version = version
@ -155,14 +155,8 @@ class AnsibleCoreCI:
raise self._create_http_error(response)
def get(self, tries=3, sleep=15, always_raise_on=None):
"""
Get instance connection information.
:type tries: int
:type sleep: int
:type always_raise_on: list[int] | None
:rtype: InstanceConnection
"""
def get(self, tries=3, sleep=15, always_raise_on=None): # type: (int, int, t.Optional[t.List[int]]) -> t.Optional[InstanceConnection]
"""Get instance connection information."""
if not self.started:
display.info('Skipping invalid %s/%s instance %s.' % (self.platform, self.version, self.instance_id),
verbosity=1)
@ -329,11 +323,8 @@ class AnsibleCoreCI:
return self.load(config)
def load(self, config):
"""
:type config: dict[str, str]
:rtype: bool
"""
def load(self, config): # type: (t.Dict[str, str]) -> bool
"""Load the instance from the provided dictionary."""
self.instance_id = str(config['instance_id'])
self.endpoint = config['endpoint']
self.started = True
@ -342,7 +333,7 @@ class AnsibleCoreCI:
return True
def _save(self):
def _save(self): # type: () -> None
"""Save instance information."""
if self.args.explain:
return
@ -351,10 +342,8 @@ class AnsibleCoreCI:
write_json_file(self.path, config, create_directories=True)
def save(self):
"""
:rtype: dict[str, str]
"""
def save(self): # type: () -> t.Dict[str, str]
"""Save instance details and return as a dictionary."""
return dict(
platform_version='%s/%s' % (self.platform, self.version),
instance_id=self.instance_id,
@ -362,11 +351,8 @@ class AnsibleCoreCI:
)
@staticmethod
def _create_http_error(response):
"""
:type response: HttpResponse
:rtype: ApplicationError
"""
def _create_http_error(response): # type: (HttpResponse) -> ApplicationError
"""Return an exception created from the given HTTP resposne."""
response_json = response.json()
stack_trace = ''
@ -392,12 +378,7 @@ class AnsibleCoreCI:
class CoreHttpError(HttpError):
"""HTTP response as an error."""
def __init__(self, status, remote_message, remote_stack_trace):
"""
:type status: int
:type remote_message: str
:type remote_stack_trace: str
"""
def __init__(self, status, remote_message, remote_stack_trace): # type: (int, str, str) -> None
super().__init__(status, '%s%s' % (remote_message, remote_stack_trace))
self.remote_message = remote_message
@ -411,10 +392,7 @@ class SshKey:
PUB_NAME = '%s.pub' % KEY_NAME
@mutex
def __init__(self, args):
"""
:type args: EnvironmentConfig
"""
def __init__(self, args): # type: (EnvironmentConfig) -> None
key_pair = self.get_key_pair()
if not key_pair:

@ -11,47 +11,32 @@ from .util import (
)
def parse_diff(lines):
"""
:type lines: list[str]
:rtype: list[FileDiff]
"""
def parse_diff(lines): # type: (t.List[str]) -> t.List[FileDiff]
"""Parse the given diff lines and return a list of FileDiff objects representing the changes of each file."""
return DiffParser(lines).files
class FileDiff:
"""Parsed diff for a single file."""
def __init__(self, old_path, new_path):
"""
:type old_path: str
:type new_path: str
"""
def __init__(self, old_path, new_path): # type: (str, str) -> None
self.old = DiffSide(old_path, new=False)
self.new = DiffSide(new_path, new=True)
self.headers = [] # type: t.List[str]
self.binary = False
def append_header(self, line):
"""
:type line: str
"""
def append_header(self, line): # type: (str) -> None
"""Append the given line to the list of headers for this file."""
self.headers.append(line)
@property
def is_complete(self):
"""
:rtype: bool
"""
def is_complete(self): # type: () -> bool
"""True if the diff is complete, otherwise False."""
return self.old.is_complete and self.new.is_complete
class DiffSide:
"""Parsed diff for a single 'side' of a single file."""
def __init__(self, path, new):
"""
:type path: str
:type new: bool
"""
def __init__(self, path, new): # type: (str, bool) -> None
self.path = path
self.new = new
self.prefix = '+' if self.new else '-'
@ -66,19 +51,14 @@ class DiffSide:
self._lines_remaining = 0
self._range_start = 0
def set_start(self, line_start, line_count):
"""
:type line_start: int
:type line_count: int
"""
def set_start(self, line_start, line_count): # type: (int, int) -> None
"""Set the starting line and line count."""
self._next_line_number = line_start
self._lines_remaining = line_count
self._range_start = 0
def append(self, line):
"""
:type line: str
"""
def append(self, line): # type: (str) -> None
"""Append the given line."""
if self._lines_remaining <= 0:
raise Exception('Diff range overflow.')
@ -113,17 +93,12 @@ class DiffSide:
self._next_line_number += 1
@property
def is_complete(self):
"""
:rtype: bool
"""
def is_complete(self): # type: () -> bool
"""True if the diff is complete, otherwise False."""
return self._lines_remaining == 0
def format_lines(self, context=True):
"""
:type context: bool
:rtype: list[str]
"""
def format_lines(self, context=True): # type: (bool) -> t.List[str]
"""Format the diff and return a list of lines, optionally including context."""
if context:
lines = self.lines_and_context
else:
@ -134,10 +109,7 @@ class DiffSide:
class DiffParser:
"""Parse diff lines."""
def __init__(self, lines):
"""
:type lines: list[str]
"""
def __init__(self, lines): # type: (t.List[str]) -> None
self.lines = lines
self.files = [] # type: t.List[FileDiff]
@ -174,7 +146,7 @@ class DiffParser:
self.complete_file()
def process_start(self):
def process_start(self): # type: () -> None
"""Process a diff start line."""
self.complete_file()
@ -186,7 +158,7 @@ class DiffParser:
self.file = FileDiff(match.group('old_path'), match.group('new_path'))
self.action = self.process_continue
def process_range(self):
def process_range(self): # type: () -> None
"""Process a diff range line."""
match = re.search(r'^@@ -((?P<old_start>[0-9]+),)?(?P<old_count>[0-9]+) \+((?P<new_start>[0-9]+),)?(?P<new_count>[0-9]+) @@', self.line)
@ -197,7 +169,7 @@ class DiffParser:
self.file.new.set_start(int(match.group('new_start') or 1), int(match.group('new_count')))
self.action = self.process_content
def process_continue(self):
def process_continue(self): # type: () -> None
"""Process a diff start, range or header line."""
if self.line.startswith('diff '):
self.process_start()
@ -206,7 +178,7 @@ class DiffParser:
else:
self.process_header()
def process_header(self):
def process_header(self): # type: () -> None
"""Process a diff header line."""
if self.line.startswith('Binary files '):
self.file.binary = True
@ -217,7 +189,7 @@ class DiffParser:
else:
self.file.append_header(self.line)
def process_content(self):
def process_content(self): # type: () -> None
"""Process a diff content line."""
if self.line == r'\ No newline at end of file':
if self.previous_line.startswith(' '):
@ -246,7 +218,7 @@ class DiffParser:
else:
raise Exception('Unexpected diff content line.')
def complete_file(self):
def complete_file(self): # type: () -> None
"""Complete processing of the current file, if any."""
if not self.file:
return

@ -28,6 +28,7 @@ from .util_common import (
)
from .config import (
CommonConfig,
EnvironmentConfig,
)
@ -206,15 +207,14 @@ def docker_cp_to(args, container_id, src, dst): # type: (EnvironmentConfig, str
docker_command(args, ['cp', src, '%s:%s' % (container_id, dst)])
def docker_run(args, image, options, cmd=None, create_only=False):
"""
:type args: EnvironmentConfig
:type image: str
:type options: list[str] | None
:type cmd: list[str] | None
:type create_only[bool] | False
:rtype: str
"""
def docker_run(
args, # type: EnvironmentConfig
image, # type: str
options, # type: t.Optional[t.List[str]]
cmd=None, # type: t.Optional[t.List[str]]
create_only=False, # type: bool
): # type: (...) -> str
"""Run a container using the given docker image."""
if not options:
options = []
@ -414,12 +414,8 @@ def docker_inspect(args, identifier, always=False): # type: (EnvironmentConfig,
raise ContainerNotFoundError(identifier)
def docker_network_disconnect(args, container_id, network):
"""
:param args: EnvironmentConfig
:param container_id: str
:param network: str
"""
def docker_network_disconnect(args, container_id, network): # type: (EnvironmentConfig, str, str) -> None
"""Disconnect the specified docker container from the given network."""
docker_command(args, ['network', 'disconnect', network, container_id], capture=True)
@ -433,18 +429,17 @@ def docker_image_exists(args, image): # type: (EnvironmentConfig, str) -> bool
return True
def docker_exec(args, container_id, cmd, options=None, capture=False, stdin=None, stdout=None, data=None):
"""
:type args: EnvironmentConfig
:type container_id: str
:type cmd: list[str]
:type options: list[str] | None
:type capture: bool
:type stdin: BinaryIO | None
:type stdout: BinaryIO | None
:type data: str | None
:rtype: str | None, str | None
"""
def docker_exec(
args, # type: EnvironmentConfig
container_id, # type: str
cmd, # type: t.List[str]
options=None, # type: t.Optional[t.List[str]]
capture=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Execute the given command in the specified container."""
if not options:
options = []
@ -454,44 +449,35 @@ def docker_exec(args, container_id, cmd, options=None, capture=False, stdin=None
return docker_command(args, ['exec'] + options + [container_id] + cmd, capture=capture, stdin=stdin, stdout=stdout, data=data)
def docker_info(args):
"""
:param args: CommonConfig
:rtype: dict[str, any]
"""
def docker_info(args): # type: (CommonConfig) -> t.Dict[str, t.Any]
"""Return a dictionary containing details from the `docker info` command."""
stdout, _dummy = docker_command(args, ['info', '--format', '{{json .}}'], capture=True, always=True)
return json.loads(stdout)
def docker_version(args):
"""
:param args: CommonConfig
:rtype: dict[str, any]
"""
def docker_version(args): # type: (CommonConfig) -> t.Dict[str, t.Any]
"""Return a dictionary containing details from the `docker version` command."""
stdout, _dummy = docker_command(args, ['version', '--format', '{{json .}}'], capture=True, always=True)
return json.loads(stdout)
def docker_command(args, cmd, capture=False, stdin=None, stdout=None, always=False, data=None):
"""
:type args: CommonConfig
:type cmd: list[str]
:type capture: bool
:type stdin: file | None
:type stdout: file | None
:type always: bool
:type data: str | None
:rtype: str | None, str | None
"""
def docker_command(
args, # type: CommonConfig
cmd, # type: t.List[str]
capture=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
always=False, # type: bool
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Run the specified docker command."""
env = docker_environment()
command = require_docker().command
return run_command(args, [command] + cmd, env=env, capture=capture, stdin=stdin, stdout=stdout, always=always, data=data)
def docker_environment():
"""
:rtype: dict[str, str]
"""
def docker_environment(): # type: () -> t.Dict[str, str]
"""Return a dictionary of docker related environment variables found in the current environment."""
env = common_environment()
env.update(dict((key, os.environ[key]) for key in os.environ if key.startswith('DOCKER_')))
return env

@ -16,22 +16,15 @@ class Git:
self.git = 'git'
self.root = root
def get_diff(self, args, git_options=None):
"""
:type args: list[str]
:type git_options: list[str] | None
:rtype: list[str]
"""
def get_diff(self, args, git_options=None): # type: (t.List[str], t.Optional[t.List[str]]) -> t.List[str]
"""Run `git diff` and return the result as a list."""
cmd = ['diff'] + args
if git_options is None:
git_options = ['-c', 'core.quotePath=']
return self.run_git_split(git_options + cmd, '\n', str_errors='replace')
def get_diff_names(self, args):
"""
:type args: list[str]
:rtype: list[str]
"""
def get_diff_names(self, args): # type: (t.List[str]) -> t.List[str]
"""Return a list of file names from the `git diff` command."""
cmd = ['diff', '--name-only', '--no-renames', '-z'] + args
return self.run_git_split(cmd, '\0')
@ -52,34 +45,23 @@ class Git:
return submodule_paths
def get_file_names(self, args):
"""
:type args: list[str]
:rtype: list[str]
"""
def get_file_names(self, args): # type: (t.List[str]) -> t.List[str]
"""Return a list of file names from the `git ls-files` command."""
cmd = ['ls-files', '-z'] + args
return self.run_git_split(cmd, '\0')
def get_branches(self):
"""
:rtype: list[str]
"""
def get_branches(self): # type: () -> t.List[str]
"""Return the list of branches."""
cmd = ['for-each-ref', 'refs/heads/', '--format', '%(refname:strip=2)']
return self.run_git_split(cmd)
def get_branch(self):
"""
:rtype: str
"""
def get_branch(self): # type: () -> str
"""Return the current branch name."""
cmd = ['symbolic-ref', '--short', 'HEAD']
return self.run_git(cmd).strip()
def get_rev_list(self, commits=None, max_count=None):
"""
:type commits: list[str] | None
:type max_count: int | None
:rtype: list[str]
"""
def get_rev_list(self, commits=None, max_count=None): # type: (t.Optional[t.List[str]], t.Optional[int]) -> t.List[str]
"""Return the list of results from the `git rev-list` command."""
cmd = ['rev-list']
if commits:
@ -92,19 +74,13 @@ class Git:
return self.run_git_split(cmd)
def get_branch_fork_point(self, branch):
"""
:type branch: str
:rtype: str
"""
def get_branch_fork_point(self, branch): # type: (str) -> str
"""Return a reference to the point at which the given branch was forked."""
cmd = ['merge-base', '--fork-point', branch]
return self.run_git(cmd).strip()
def is_valid_ref(self, ref):
"""
:type ref: str
:rtype: bool
"""
def is_valid_ref(self, ref): # type: (str) -> bool
"""Return True if the given reference is valid, otherwise return False."""
cmd = ['show', ref]
try:
self.run_git(cmd, str_errors='replace')
@ -112,13 +88,8 @@ class Git:
except SubprocessError:
return False
def run_git_split(self, cmd, separator=None, str_errors='strict'):
"""
:type cmd: list[str]
:type separator: str | None
:type str_errors: str
:rtype: list[str]
"""
def run_git_split(self, cmd, separator=None, str_errors='strict'): # type: (t.List[str], t.Optional[str], str) -> t.List[str]
"""Run the given `git` command and return the results as a list."""
output = self.run_git(cmd, str_errors=str_errors).strip(separator)
if not output:
@ -126,10 +97,6 @@ class Git:
return output.split(separator)
def run_git(self, cmd, str_errors='strict'):
"""
:type cmd: list[str]
:type str_errors: str
:rtype: str
"""
def run_git(self, cmd, str_errors='strict'): # type: (t.List[str], str) -> str
"""Run the given `git` command and return the results as a string."""
return raw_command([self.git] + cmd, cwd=self.root, capture=True, str_errors=str_errors)[0]

@ -31,37 +31,20 @@ class HttpClient:
self.username = None
self.password = None
def get(self, url):
"""
:type url: str
:rtype: HttpResponse
"""
def get(self, url): # type: (str) -> HttpResponse
"""Perform an HTTP GET and return the response."""
return self.request('GET', url)
def delete(self, url):
"""
:type url: str
:rtype: HttpResponse
"""
def delete(self, url): # type: (str) -> HttpResponse
"""Perform an HTTP DELETE and return the response."""
return self.request('DELETE', url)
def put(self, url, data=None, headers=None):
"""
:type url: str
:type data: str | None
:type headers: dict[str, str] | None
:rtype: HttpResponse
"""
def put(self, url, data=None, headers=None): # type: (str, t.Optional[str], t.Optional[t.Dict[str, str]]) -> HttpResponse
"""Perform an HTTP PUT and return the response."""
return self.request('PUT', url, data, headers)
def request(self, method, url, data=None, headers=None):
"""
:type method: str
:type url: str
:type data: str | None
:type headers: dict[str, str] | None
:rtype: HttpResponse
"""
def request(self, method, url, data=None, headers=None): # type: (str, str, t.Optional[str], t.Optional[t.Dict[str, str]]) -> HttpResponse
"""Perform an HTTP request and return the response."""
cmd = ['curl', '-s', '-S', '-i', '-X', method]
if self.insecure:
@ -130,22 +113,14 @@ class HttpClient:
class HttpResponse:
"""HTTP response from curl."""
def __init__(self, method, url, status_code, response):
"""
:type method: str
:type url: str
:type status_code: int
:type response: str
"""
def __init__(self, method, url, status_code, response): # type: (str, str, int, str) -> None
self.method = method
self.url = url
self.status_code = status_code
self.response = response
def json(self):
"""
:rtype: any
"""
def json(self): # type: () -> t.Any
"""Return the response parsed as JSON, raising an exception if parsing fails."""
try:
return json.loads(self.response)
except ValueError:
@ -154,10 +129,6 @@ class HttpResponse:
class HttpError(ApplicationError):
"""HTTP response as an error."""
def __init__(self, status, message):
"""
:type status: int
:type message: str
"""
def __init__(self, status, message): # type: (int, str) -> None
super().__init__('%s: %s' % (status, message))
self.status = status

@ -353,11 +353,8 @@ def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None,
yield TestTarget(file_path, module_path, prefix, path, symlink)
def analyze_integration_target_dependencies(integration_targets):
"""
:type integration_targets: list[IntegrationTarget]
:rtype: dict[str,set[str]]
"""
def analyze_integration_target_dependencies(integration_targets): # type: (t.List[IntegrationTarget]) -> t.Dict[str, t.Set[str]]
"""Analyze the given list of integration test targets and return a dictionary expressing target names and the target names which depend on them."""
real_target_root = os.path.realpath(data_context().content.integration_targets_path) + '/'
role_targets = [target for target in integration_targets if target.type == 'role']

Loading…
Cancel
Save