Extract to module podman_compose_tools with enhanced Command classes

main
Felix Stupp 2 years ago
parent e486a3c506
commit 505248a495
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -23,21 +23,14 @@
from __future__ import annotations
import argparse
from functools import cache, cached_property, wraps
import json
import argparse
from functools import cached_property, wraps
import os
from pathlib import Path
import shlex
from subprocess import CompletedProcess, PIPE, run
from pathlib import Path, PurePath
import sys
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
NewType,
Optional,
@ -50,13 +43,23 @@ from attrs import define, field
from podman_compose import normalize, rec_merge, rec_subs
import yaml
from podman_compose_tools.executor import (
ArgCommand,
BinaryExecutor,
CompletedExec,
ExecutorTarget,
HostExecutor,
)
from podman_compose_tools.executor.base import (
combine_cmds,
CommandArgs,
)
# === custom types
ProjectName = NewType("ProjectName", str)
ShellCommand = NewType("ShellCommand", str)
CommandArgs = NewType("CommandArgs", List[str])
ComposeVersion = NewType("ComposeVersion", str)
ContainerName = NewType("ContainerName", str)
@ -102,21 +105,13 @@ DEFAULT_BACKUP_CMD = "tar -cf - ."
DEFAULT_RESTORE_CMD = "tar -xf -"
# List of POSIX shells which shall be used
DETECTED_SHELLS = [
"/usr/bin/bash",
"/bin/bash",
"/usr/bin/sh",
"/bin/sh",
]
PODMAN_EXEC = CommandArgs(
PODMAN_EXEC = ArgCommand(
[
"/usr/bin/env",
"podman",
]
)
PODMAN_COMPOSE_EXEC = CommandArgs(
PODMAN_COMPOSE_EXEC = ArgCommand(
[
"/usr/bin/env",
"podman-compose",
@ -127,81 +122,7 @@ PODMAN_COMPOSE_EXEC = CommandArgs(
# === helpers
@define()
class CompletedExec:
completed_process: CompletedProcess
@property
def returncode(self) -> int:
return self.completed_process.returncode
def check_returncode(self) -> None:
return self.completed_process.check_returncode()
def to_json(self) -> Mapping[str, Any]:
return json.loads(self.completed_process.stdout)
def filter_cmds(command: Iterable[Optional[str]]) -> CommandArgs:
return CommandArgs([arg for arg in command if arg is not None])
def combine_cmds(*commands: CommandArgs | List[Optional[str]]) -> CommandArgs:
return CommandArgs([arg for cmd in commands for arg in filter_cmds(cmd)])
def exec_cmd(
command: CommandArgs, check: bool = True, capture_stdout: bool = True
) -> CompletedExec:
return CompletedExec(
run(
args=command,
check=check,
shell=False,
stdout=PIPE if capture_stdout else None,
)
)
def process_tester(
exec: Callable[[CommandArgs], CompletedExec]
) -> Callable[[CommandArgs], bool]:
return lambda command: exec(command).returncode == 0
def search_shell_with(tester: Callable[[CommandArgs], bool]) -> str:
for shell in DETECTED_SHELLS:
command = CommandArgs([shell, "-c", "true"])
res = exec_cmd(command, check=False)
if tester(command):
return command[0]
# TODO specialize
raise Exception(
f"Could not find an acceptable shell on this host, searched for {DETECTED_SHELLS}"
)
@cache
def search_shell() -> str:
return search_shell_with(
process_tester(
lambda command: exec_cmd(
command=command,
check=False,
capture_stdout=False,
)
)
)
def exec_shell(
shell_cmd: ShellCommand, check: bool = True, capture_stdout: bool = True
) -> CompletedExec:
return exec_cmd(
CommandArgs([search_shell(), "-c", shell_cmd]),
check=check,
capture_stdout=capture_stdout,
)
host = HostExecutor()
@wraps(print)
@ -253,33 +174,11 @@ class VolumeBackupConfig:
@define(kw_only=True)
class PodmanClient:
podman_cmd: CommandArgs
podman_compose_cmd: CommandArgs
exec: BinaryExecutor = field(converter=lambda a: BinaryExecutor(a))
compose_exec: BinaryExecutor = field(converter=lambda a: BinaryExecutor(a))
def exec_podman(
self,
command: CommandArgs,
check: bool = True,
) -> CompletedExec:
return exec_cmd(
command=CommandArgs(self.podman_cmd + command),
check=check,
capture_stdout=True,
)
def exec_compose(
self,
command: CommandArgs,
check: bool = True,
) -> CompletedExec:
return exec_cmd(
command=CommandArgs(self.podman_compose_cmd + command),
check=check,
capture_stdout=True,
)
class ComposeFile:
class ComposeFile(ExecutorTarget):
podman: PodmanClient
project_name: ProjectName
@ -318,6 +217,10 @@ class ComposeFile:
)
sys.exit(1)
@property
def ref_dir(self) -> Path:
return self.compose["_dirname"]
@property
def version(self) -> ComposeVersion:
return self.compose.get("version", ComposeVersion(""))
@ -344,23 +247,28 @@ class ComposeFile:
for name, opts in self.__volumes_defs.items()
}
def exec_compose(
def exec_cmd(
self,
*,
command: CommandArgs,
check: bool = True,
capture_stdout: bool = False,
work_dir: Optional[PurePath] = None,
) -> CompletedExec:
return self.podman.exec_compose(
return super().exec_cmd(
command=combine_cmds(
[f"--project-name=self.project_name"],
[f"--file={file}" for file in self.compose_files],
command,
),
check=check,
capture_stdout=capture_stdout,
work_dir=work_dir or self.ref_dir,
)
@define(kw_only=True)
class ComposeContainer:
class ComposeContainer(ExecutorTarget):
compose: ComposeFile
name: ServiceName
@ -377,24 +285,26 @@ class ComposeContainer:
def depends_on(self) -> Sequence[ComposeContainer]:
return [self.compose.services[name] for name in self.base.get("depends_on", [])]
def exec(
def exec_cmd(
self,
command: CommandArgs,
check: bool = True,
workdir: Optional[str] = None,
capture_stdout: bool = False,
work_dir: Optional[PurePath] = None,
) -> CompletedExec:
return self.compose.podman.exec_podman(
return self.compose.podman.exec.exec_cmd(
command=combine_cmds(
[
"container",
"exec",
"--interactive=false",
None if workdir is None else f"--workdir={workdir}",
None if work_dir is None else f"--workdir={work_dir}",
self.container_name,
],
command,
),
check=check,
capture_stdout=capture_stdout,
)

@ -0,0 +1,17 @@
from .command import (
Command,
ArgCommand,
ShellCommand,
)
from .completed import (
CompletedExec,
)
from .execution import (
ExecutorTarget,
)
from expander import (
BinaryExecutor,
)
from .host import (
HostExecutor,
)

@ -0,0 +1,13 @@
from typing import Iterable, List, NewType, Optional
CommandArgs = NewType("CommandArgs", List[str])
ShellCommandStr = NewType("ShellCommandStr", str)
def filter_cmds(command: Iterable[Optional[str]]) -> CommandArgs:
return CommandArgs([arg for arg in command if arg is not None])
def combine_cmds(*commands: CommandArgs | List[Optional[str]]) -> CommandArgs:
return CommandArgs([arg for cmd in commands for arg in filter_cmds(cmd)])

@ -0,0 +1,159 @@
from __future__ import annotations
import abc
from pathlib import PurePath
import shlex
from typing import Callable, Iterable, List, Optional
from attrs import define
from .base import CommandArgs, ShellCommandStr
from .completed import CompletedExec
from .execution import ExecutorTarget
class Command(metaclass=abc.ABCMeta):
def __call__(
self,
executor: ExecutorTarget,
check: bool = True,
capture_stdout: bool = True,
) -> CompletedExec:
return self.run(
executor=executor,
check=check,
capture_stdout=capture_stdout,
)
@abc.abstractmethod
def run(
self,
*,
executor: ExecutorTarget,
check: bool = True,
capture_stdout: bool = True,
) -> CompletedExec:
...
@define
class ArgCommand(Command):
args: List[str]
@classmethod
def from_single(cls, arg: str) -> ArgCommand:
return cls(args=[arg])
def __add__(
self,
other: ArgCommand | CommandArgs | Iterable[Optional[str]],
) -> ArgCommand:
if isinstance(other, ArgCommand):
return ArgCommand(args=self.args + other.args)
return ArgCommand(args=self.args + [arg for arg in other if arg is not None])
def __radd__(
self,
other: ArgCommand | CommandArgs | Iterable[Optional[str]],
) -> ArgCommand:
if isinstance(other, ArgCommand):
return ArgCommand(args=other.args + self.args)
return ArgCommand(args=[arg for arg in other if arg is not None] + self.args)
def __and__(
self,
other: Optional[str],
) -> ArgCommand:
if other is None:
return self
return self + [other]
def __str__(self) -> str:
return " ".join(shlex.quote(arg) for arg in self.args)
def to_shell_cmd(self) -> ShellCommand:
return ShellCommand(command=str(self))
def run(
self,
*,
executor: ExecutorTarget,
check: bool = True,
capture_stdout: bool = True,
work_dir: Optional[PurePath] = None,
) -> CompletedExec:
return executor.exec_cmd(
command=CommandArgs(self.args),
check=check,
capture_stdout=capture_stdout,
work_dir=work_dir,
)
@define(order=False)
class ShellCommand(Command):
command: str
@staticmethod
def _extract_other(
method: Callable[[ShellCommand, str], str]
) -> Callable[[ShellCommand, Command], ShellCommand]:
def decorated(self, other: Command) -> ShellCommand:
other_cmd: str
if isinstance(other, Command):
other_cmd = str(other)
else:
return NotImplemented
return ShellCommand(method(self, other_cmd))
return decorated
@staticmethod
def _parse_path(
method: Callable[[ShellCommand, str], str]
) -> Callable[[ShellCommand, PurePath | str], ShellCommand]:
def decorated(self, other: PurePath | str) -> ShellCommand:
other_cmd: str
if isinstance(other, str):
other_cmd = other
elif isinstance(other, PurePath):
other_cmd = str(other)
else:
return NotImplemented
return ShellCommand(method(self, other_cmd))
return decorated
@_extract_other
def __or__(self, other_cmd: str) -> str: # |
return f"({self.command}) | ({other_cmd})"
@_extract_other
def __ror__(self, other_cmd: str) -> str: # |
return f"({other_cmd}) | ({self.command})"
@_parse_path
def __lt__(self, other: str) -> str: # <
return f"{self.command} < {shlex.quote(other)}"
@_parse_path
def __gt__(self, other: str) -> str: # >
return f"{self.command} > {shlex.quote(other)}"
def __str__(self) -> str:
return self.command
def run(
self,
*,
executor: ExecutorTarget,
check: bool = True,
capture_stdout: bool = True,
work_dir: Optional[PurePath] = None,
) -> CompletedExec:
return executor.exec_shell(
shell_cmd=ShellCommandStr(self.command),
check=check,
capture_stdout=capture_stdout,
work_dir=work_dir,
)

@ -0,0 +1,20 @@
import json
from subprocess import CompletedProcess
from typing import Any, Mapping
from attrs import define
@define()
class CompletedExec:
completed_process: CompletedProcess
@property
def returncode(self) -> int:
return self.completed_process.returncode
def check_returncode(self) -> None:
return self.completed_process.check_returncode()
def to_json(self) -> Mapping[str, Any]:
return json.loads(self.completed_process.stdout)

@ -0,0 +1,76 @@
import abc
from functools import cached_property
from pathlib import PurePath
from typing import Callable, Optional
from attrs import define
from .base import CommandArgs, ShellCommandStr
from .completed import CompletedExec
# List of POSIX shells which shall be used
DETECTED_SHELLS = [
"/usr/bin/bash",
"/bin/bash",
"/usr/bin/sh",
"/bin/sh",
]
class ExecutorTarget(metaclass=abc.ABCMeta):
@abc.abstractmethod
def exec_cmd(
self,
*,
command: CommandArgs,
check: bool,
capture_stdout: bool,
work_dir: Optional[PurePath],
) -> CompletedExec:
...
@staticmethod
def process_tester(
exec: Callable[[CommandArgs], CompletedExec]
) -> Callable[[CommandArgs], bool]:
return lambda command: exec(command).returncode == 0
@staticmethod
def _search_shell_with(tester: Callable[[CommandArgs], bool]) -> str:
for shell in DETECTED_SHELLS:
command = CommandArgs([shell, "-c", "true"])
if tester(command):
return command[0]
# TODO specialize
raise Exception(
f"Could not find an acceptable shell on this host, searched for {DETECTED_SHELLS}"
)
@cached_property
def found_shell(self) -> str:
return self._search_shell_with(
self.process_tester(
lambda command: self.exec_cmd(
command=command,
check=False,
capture_stdout=False,
work_dir=None,
)
)
)
def exec_shell(
self,
*,
shell_cmd: ShellCommandStr,
check: bool,
capture_stdout: bool,
work_dir: Optional[PurePath],
) -> CompletedExec:
return self.exec_cmd(
command=CommandArgs([self.found_shell, "-c", shell_cmd]),
check=check,
capture_stdout=capture_stdout,
work_dir=work_dir,
)

@ -0,0 +1,31 @@
from __future__ import annotations
from pathlib import PurePath
from typing import Optional
from attrs import define
from .base import CommandArgs
from .command import ArgCommand
from .completed import CompletedExec
from .execution import ExecutorTarget
@define
class BinaryExecutor(ExecutorTarget):
binary_args: CommandArgs
def exec_cmd(
self,
*,
command: CommandArgs,
check: bool,
capture_stdout: bool,
work_dir: Optional[PurePath],
) -> CompletedExec:
return super().exec_cmd(
command=CommandArgs(self.binary_args + command),
check=check,
capture_stdout=capture_stdout,
work_dir=work_dir,
)

@ -0,0 +1,30 @@
from __future__ import annotations
from pathlib import PurePath
import subprocess
from typing import Optional
from .base import CommandArgs
from .completed import CompletedExec
from .execution import ExecutorTarget
from ..misc.singleton import Singleton
class HostExecutor(ExecutorTarget, metaclass=Singleton):
def exec_cmd(
self,
*,
command: CommandArgs,
check: bool,
capture_stdout: bool,
work_dir: Optional[PurePath] = None,
) -> CompletedExec:
return CompletedExec(
subprocess.run(
args=command,
check=check,
cwd=work_dir,
shell=False,
stdout=subprocess.PIPE if capture_stdout else None,
)
)

@ -0,0 +1 @@
from .singleton import Singleton

@ -0,0 +1,18 @@
from __future__ import annotations
from typing import Any, Type, TypeVar
T = TypeVar("T", bound="Singleton")
class Singleton(type):
_instances = dict[Type, Any]()
def __call__(cls: T, *args, **kwargs) -> T:
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
__all__ = ["Singleton"]
Loading…
Cancel
Save