#!/usr/bin/env python3 # imports from pathlib import Path from hashlib import sha256 import shlex import subprocess import sys import requests # config MONITOR_DESC = """{{ description }}""" MONITOR_COMMAND = """{{ command_str }}""" USE_SHELL = {{ use_shell | ternary('True', 'False') }} DATA_PATH = Path("""{{ data_path }}""") TG_ENDPOINT = "https://api.telegram.org" TG_KEY = """{{ bot_key }}""" TG_RECIPIENT = """{{ recipient_id }}""" # helpers def print_error(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def tg_msg(msg: str) -> None: print(f"Sending message using telegram:\n{msg}") ret = requests.post(f"{TG_ENDPOINT}/bot{TG_KEY}/sendMessage", data={ "chat_id": TG_RECIPIENT, "disable_web_page_preview": 1, "parse_mode": "Markdown", "text": msg, }) if 400 <= ret.status_code: raise Exception(f"Sending telegram message failed: {ret.status_code} {ret.text}") def run_cmd(cmd: list, **kwargs) -> str: return subprocess.run(cmd, capture_output=True, check=True, text=True, **kwargs).stdout def hash_data(data: str) -> bool: return sha256(data.encode("utf-8")).hexdigest() def check_cmd(cmd_str: str, use_shell: bool, data_file: Path) -> str: cmd = shlex.split(cmd_str) if not use_shell else cmd_str old_hash = data_file.read_text() if data_file.exists() else None new_data = run_cmd(cmd, shell=use_shell) new_hash = hash_data(new_data) if old_hash == new_hash: return None data_file.write_text(new_hash) return new_data if __name__ == "__main__": try: data = check_cmd(MONITOR_COMMAND, USE_SHELL, DATA_PATH) if data: tg_msg(f"{MONITOR_DESC} changed to:\n```\n{data}\n```") except Exception as e: tg_msg(f"Got exception while running command of {MONITOR_DESC}: {str(e)}") raise e