[stable-2.13] Clean up release script (#81488) (#81491)

* Enable mypy for the entire packaging directory

* Return CompletedProcess only when capturing output

This allows stdout/stderr on CompletedProcess to be `str` instead of `str | None`.
The unused args on CompletedProcess have been removed.
Overload type hints have been added to reflect these changes.

* Relax return type on ensure_venv

This improves consistency with its usage, since `run` accepts `env` of `dict[str, t.Any]`.
Also removed unnecssary `str()` usage when updating `env`.

* Fix type hint on suppress_when

* Fix callable annotation

* Add type hint for command_parser

PyCharm complains about using a protected member, and also that it can't find the type in the type stubs.
However, mypy properly recognizes the type.

* Avoid unnecessary TypeVar usage.
(cherry picked from commit 47ab59753c)
pull/81529/head
Matt Clay 2 years ago committed by GitHub
parent 5c7d361b6e
commit d05ddfe8bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -42,7 +42,7 @@ from packaging.version import Version, InvalidVersion
# region CLI Framework # region CLI Framework
C = t.TypeVar("C", bound=t.Callable[[...], None]) C = t.TypeVar("C", bound=t.Callable[..., None])
def path_to_str(value: t.Any) -> str: def path_to_str(value: t.Any) -> str:
@ -50,12 +50,27 @@ def path_to_str(value: t.Any) -> str:
return f"{value}/" if isinstance(value, pathlib.Path) and value.is_dir() else str(value) return f"{value}/" if isinstance(value, pathlib.Path) and value.is_dir() else str(value)
@t.overload
def run(*args: t.Any, env: dict[str, t.Any] | None, cwd: pathlib.Path | str, capture_output: t.Literal[True]) -> CompletedProcess:
...
@t.overload
def run(*args: t.Any, env: dict[str, t.Any] | None, cwd: pathlib.Path | str, capture_output: t.Literal[False]) -> None:
...
@t.overload
def run(*args: t.Any, env: dict[str, t.Any] | None, cwd: pathlib.Path | str) -> None:
...
def run( def run(
*args: t.Any, *args: t.Any,
env: dict[str, t.Any] | None, env: dict[str, t.Any] | None,
cwd: pathlib.Path | str, cwd: pathlib.Path | str,
capture_output: bool = False, capture_output: bool = False,
) -> CompletedProcess: ) -> CompletedProcess | None:
"""Run the specified command.""" """Run the specified command."""
args = [arg.relative_to(cwd) if isinstance(arg, pathlib.Path) else arg for arg in args] args = [arg.relative_to(cwd) if isinstance(arg, pathlib.Path) else arg for arg in args]
@ -76,16 +91,18 @@ def run(
stderr=ex.stderr, stderr=ex.stderr,
) from None ) from None
if not capture_output:
return None
# improve type hinting # improve type hinting
return CompletedProcess( return CompletedProcess(
args=str_args,
stdout=p.stdout, stdout=p.stdout,
stderr=p.stderr, stderr=p.stderr,
) )
@contextlib.contextmanager @contextlib.contextmanager
def suppress_when(error_as_warning: bool) -> None: def suppress_when(error_as_warning: bool) -> t.Generator[None, None, None]:
"""Conditionally convert an ApplicationError in the provided context to a warning.""" """Conditionally convert an ApplicationError in the provided context to a warning."""
if error_as_warning: if error_as_warning:
try: try:
@ -122,9 +139,8 @@ class CalledProcessError(Exception):
class CompletedProcess: class CompletedProcess:
"""Results from a completed process.""" """Results from a completed process."""
args: tuple[str, ...] stdout: str
stdout: str | None stderr: str
stderr: str | None
class Display: class Display:
@ -167,7 +183,7 @@ class CommandFramework:
""" """
def __init__(self, **kwargs: dict[str, t.Any] | None) -> None: def __init__(self, **kwargs: dict[str, t.Any] | None) -> None:
self.commands: list[C] = [] self.commands: list[t.Callable[..., None]] = []
self.arguments = kwargs self.arguments = kwargs
self.parsed_arguments: argparse.Namespace | None = None self.parsed_arguments: argparse.Namespace | None = None
@ -176,7 +192,7 @@ class CommandFramework:
self.commands.append(func) self.commands.append(func)
return func return func
def run(self, *args: C, **kwargs) -> None: def run(self, *args: t.Callable[..., None], **kwargs) -> None:
"""Run the specified command(s), using any provided internal args.""" """Run the specified command(s), using any provided internal args."""
for arg in args: for arg in args:
self._run(arg, **kwargs) self._run(arg, **kwargs)
@ -203,6 +219,9 @@ class CommandFramework:
arguments = arguments.copy() arguments = arguments.copy()
exclusive = arguments.pop("exclusive", None) exclusive = arguments.pop("exclusive", None)
# noinspection PyProtectedMember, PyUnresolvedReferences
command_parser: argparse._ActionsContainer
if exclusive: if exclusive:
if exclusive not in exclusive_groups: if exclusive not in exclusive_groups:
exclusive_groups[exclusive] = func_parser.add_mutually_exclusive_group() exclusive_groups[exclusive] = func_parser.add_mutually_exclusive_group()
@ -234,7 +253,7 @@ class CommandFramework:
display.fatal(ex) display.fatal(ex)
sys.exit(1) sys.exit(1)
def _run(self, func: C, **kwargs) -> None: def _run(self, func: t.Callable[..., None], **kwargs) -> None:
"""Run the specified command, using any provided internal args.""" """Run the specified command, using any provided internal args."""
signature = inspect.signature(func) signature = inspect.signature(func)
func_args = {name: getattr(self.parsed_arguments, name) for name in signature.parameters if hasattr(self.parsed_arguments, name)} func_args = {name: getattr(self.parsed_arguments, name) for name in signature.parameters if hasattr(self.parsed_arguments, name)}
@ -253,7 +272,7 @@ class CommandFramework:
display.show(f"<== {label}", color=Display.BLUE) display.show(f"<== {label}", color=Display.BLUE)
@staticmethod @staticmethod
def _format_command_name(func: C) -> str: def _format_command_name(func: t.Callable[..., None]) -> str:
"""Return the friendly name of the given command.""" """Return the friendly name of the given command."""
return func.__name__.replace("_", "-") return func.__name__.replace("_", "-")
@ -441,7 +460,22 @@ class VersionMode(enum.Enum):
raise NotImplementedError(self) raise NotImplementedError(self)
def git(*args: t.Any, capture_output: bool = False) -> CompletedProcess: @t.overload
def git(*args: t.Any, capture_output: t.Literal[True]) -> CompletedProcess:
...
@t.overload
def git(*args: t.Any, capture_output: t.Literal[False]) -> None:
...
@t.overload
def git(*args: t.Any) -> None:
...
def git(*args: t.Any, capture_output: t.Literal[True] | t.Literal[False] = False) -> CompletedProcess | None:
"""Run the specified git command.""" """Run the specified git command."""
return run("git", *args, env=None, cwd=CHECKOUT_DIR, capture_output=capture_output) return run("git", *args, env=None, cwd=CHECKOUT_DIR, capture_output=capture_output)
@ -629,7 +663,7 @@ def get_git_state(version: Version, allow_stale: bool) -> GitState:
@functools.cache @functools.cache
def ensure_venv() -> dict[str, str]: def ensure_venv() -> dict[str, t.Any]:
"""Ensure the release venv is ready and return the env vars needed to use it.""" """Ensure the release venv is ready and return the env vars needed to use it."""
# TODO: consider freezing the ansible and release requirements along with their dependencies # TODO: consider freezing the ansible and release requirements along with their dependencies
@ -1274,7 +1308,7 @@ def build(allow_dirty: bool = False) -> None:
commit_time = int(git("show", "-s", "--format=%ct", capture_output=True).stdout) commit_time = int(git("show", "-s", "--format=%ct", capture_output=True).stdout)
env.update( env.update(
SOURCE_DATE_EPOCH=str(commit_time), SOURCE_DATE_EPOCH=commit_time,
) )
git("worktree", "add", "-d", temp_dir) git("worktree", "add", "-d", temp_dir)

Loading…
Cancel
Save