You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/lib/ansible_test/_internal/processes.py

81 lines
2.2 KiB
Python

"""Wrappers around `ps` for querying running processes."""
from __future__ import annotations
import collections
import dataclasses
import os
import pathlib
import shlex
from ansible_test._internal.util import raw_command
@dataclasses.dataclass(frozen=True)
class ProcessData:
"""Data about a running process."""
pid: int
ppid: int
command: str
@dataclasses.dataclass(frozen=True)
class Process:
"""A process in the process tree."""
pid: int
command: str
parent: Process | None = None
children: tuple[Process, ...] = dataclasses.field(default_factory=tuple)
@property
def args(self) -> list[str]:
"""The list of arguments that make up `command`."""
return shlex.split(self.command)
@property
def path(self) -> pathlib.Path:
"""The path to the process."""
return pathlib.Path(self.args[0])
def get_process_data(pids: list[int] | None = None) -> list[ProcessData]:
"""Return a list of running processes."""
if pids:
args = ['-p', ','.join(map(str, pids))]
else:
args = ['-A']
lines = raw_command(['ps'] + args + ['-o', 'pid,ppid,command'], capture=True)[0].splitlines()[1:]
processes = [ProcessData(pid=int(pid), ppid=int(ppid), command=command) for pid, ppid, command in (line.split(maxsplit=2) for line in lines)]
return processes
def get_process_tree() -> dict[int, Process]:
"""Return the process tree."""
processes = get_process_data()
pid_to_process: dict[int, Process] = {}
pid_to_children: dict[int, list[Process]] = collections.defaultdict(list)
for data in processes:
pid_to_process[data.pid] = process = Process(pid=data.pid, command=data.command)
if data.ppid:
pid_to_children[data.ppid].append(process)
for data in processes:
pid_to_process[data.pid] = dataclasses.replace(
pid_to_process[data.pid],
parent=pid_to_process.get(data.ppid),
children=tuple(pid_to_children[data.pid]),
)
return pid_to_process
def get_current_process() -> Process:
"""Return the current process along with its ancestors and descendants."""
return get_process_tree()[os.getpid()]