mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
81 lines
2.2 KiB
Python
81 lines
2.2 KiB
Python
"""Wrappers around `ps` for querying running processes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import collections
|
|
import dataclasses
|
|
import os
|
|
import pathlib
|
|
import shlex
|
|
|
|
from ansible_test._internal.util import raw_command
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ProcessData:
|
|
"""Data about a running process."""
|
|
|
|
pid: int
|
|
ppid: int
|
|
command: str
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class Process:
|
|
"""A process in the process tree."""
|
|
|
|
pid: int
|
|
command: str
|
|
parent: Process | None = None
|
|
children: tuple[Process, ...] = dataclasses.field(default_factory=tuple)
|
|
|
|
@property
|
|
def args(self) -> list[str]:
|
|
"""The list of arguments that make up `command`."""
|
|
return shlex.split(self.command)
|
|
|
|
@property
|
|
def path(self) -> pathlib.Path:
|
|
"""The path to the process."""
|
|
return pathlib.Path(self.args[0])
|
|
|
|
|
|
def get_process_data(pids: list[int] | None = None) -> list[ProcessData]:
|
|
"""Return a list of running processes."""
|
|
if pids:
|
|
args = ['-p', ','.join(map(str, pids))]
|
|
else:
|
|
args = ['-A']
|
|
|
|
lines = raw_command(['ps'] + args + ['-o', 'pid,ppid,command'], capture=True)[0].splitlines()[1:]
|
|
processes = [ProcessData(pid=int(pid), ppid=int(ppid), command=command) for pid, ppid, command in (line.split(maxsplit=2) for line in lines)]
|
|
|
|
return processes
|
|
|
|
|
|
def get_process_tree() -> dict[int, Process]:
|
|
"""Return the process tree."""
|
|
processes = get_process_data()
|
|
pid_to_process: dict[int, Process] = {}
|
|
pid_to_children: dict[int, list[Process]] = collections.defaultdict(list)
|
|
|
|
for data in processes:
|
|
pid_to_process[data.pid] = process = Process(pid=data.pid, command=data.command)
|
|
|
|
if data.ppid:
|
|
pid_to_children[data.ppid].append(process)
|
|
|
|
for data in processes:
|
|
pid_to_process[data.pid] = dataclasses.replace(
|
|
pid_to_process[data.pid],
|
|
parent=pid_to_process.get(data.ppid),
|
|
children=tuple(pid_to_children[data.pid]),
|
|
)
|
|
|
|
return pid_to_process
|
|
|
|
|
|
def get_current_process() -> Process:
|
|
"""Return the current process along with its ancestors and descendants."""
|
|
return get_process_tree()[os.getpid()]
|