#!/usr/bin/env python3 import json import re import sys import yaml class LoopPrevention: def __init__(self, obj): self.__obj = obj self.__entered = False def __enter__(self): if self.__entered: raise Exception("detected and prevented infinite loop") self.__entered = True return self def __exit__(self, *args): self.__entered = False return False # forward exception class Group: def __init__(self, inv): self.__inv = inv self.__hosts = set() self.__children = set() def add_host(self, host): if not host in self.__hosts: self.__hosts.add(host) def add_hosts(self, hosts): self.__hosts |= hosts @property def direct_hosts(self): return set(self.__hosts) @property def all_hosts(self): with LoopPrevention(self): hosts = self.direct_hosts for child in self.children: hosts |= self.__inv._group(child).all_hosts return hosts def add_child(self, group_name): if not group_name in self.__children: self.__children.add(group_name) @property def children(self): return set(self.__children) def export(self): return { "hosts": list(self.__hosts), "vars": dict(), "children": list(self.__children) } class Inventory: def __init__(self): self.__groups = dict() self.add_group("all") def __group(self, group_name): if group_name not in self.__groups: self.__groups[group_name] = Group(self) return self.__groups[group_name] def _group(self, group_name): if group_name not in self.__groups: raise Exception(f'Unknown group "{group_name}"') return self.__groups[group_name] def add_host(self, host): self.__group("all").add_host(host) def add_hosts(self, hosts): self.__group("all").add_hosts(hosts) def add_group(self, group_name): self.__group(group_name) def add_host_to_group(self, host, group_name): self.add_host(host) self.__group(group_name).add_host(host) def add_hosts_to_group(self, hosts, group_name): self.add_hosts(hosts) self.__group(group_name).add_hosts(hosts) def add_child_to_group(self, child_name, parent_name): self.__group(child_name) self.__group(parent_name).add_child(child_name) def all_hosts_of_group(self, group_name): return self._group(group_name).all_hosts def export(self): meta_dict = { "_meta": { "hostvars": {}, }, } group_dict = { group_name: group.export() for group_name, group in self.__groups.items() } return { **meta_dict , **group_dict } def _read_yaml(path): with open(path, 'r') as stream: try: return yaml.safe_load(stream) except yaml.YAMLError as e: return AnsibleError(e) GROUPS_PATTERN_OPS = { "": lambda old, add: old | add, "&": lambda old, add: old & add, "!": lambda old, add: old - add, } GROUPS_PATTERN_OPS_NAMES = "".join(GROUPS_PATTERN_OPS.keys()) GROUPS_PATTERN = re.compile(r'^(?P[' + GROUPS_PATTERN_OPS_NAMES + r']?)(?P[^' + GROUPS_PATTERN_OPS_NAMES + r'].*)$') def _parse_group_aliasses(inv, data): for group, syntax in data.items(): if isinstance(syntax, str): group_list = syntax.split(':') elif isinstance(syntax, list): group_list = syntax else: raise Exception(f'Unknown syntax for alias "{group}": {syntax}') if len(syntax) <= 0 or len(group_list) <= 0: raise Exception(f'Empty syntax for alias "{group}": {syntax}') if group_list[0][0] == '!': # if first entry is an inversion group_list.insert(0, 'all') # remove group from all for inversion hosts = set() for group_name in group_list: group_matched = GROUPS_PATTERN.match(group_name) add = inv.all_hosts_of_group(group_matched.group('group_name')) op = GROUPS_PATTERN_OPS[group_matched.group('operation')] hosts = op(hosts, add) inv.add_hosts_to_group(hosts, group) def _parse_groups(inv, data): for group, children in data.items(): inv.add_group(group) if children is None: continue # as if no children are given for child in children: inv.add_child_to_group(child, group) if isinstance(children, dict): _parse_groups(inv, children) def _parse_host_groups(inv, data): GROUPS_KEY = "_all" for host_group, hosts in data.items(): inv.add_group(host_group) if hosts is None: continue for host in hosts: if host != GROUPS_KEY: inv.add_host_to_group(host, host_group) if isinstance(hosts, dict): hosts = dict(hosts) # copy dict for further edits parents = hosts.pop(GROUPS_KEY, None) if parents is not None: for parent in parents: inv.add_child_to_group(host_group, parent) _parse_single_hosts(inv, hosts) def _parse_single_hosts(inv, data): for host, groups in data.items(): inv.add_host(host) if groups is not None: for group in groups: inv.add_host_to_group(host, group) def _parse_version_0(inv, data): return _parse_single_hosts(inv, data) parser_mapping_v1 = { "groups": _parse_groups, "host_groups": _parse_host_groups, "single_hosts": _parse_single_hosts } def _parse_version_1(inv, data): for key_name, parser in parser_mapping_v1.items(): if key_name in data: parser(inv, data[key_name]) def _parse_version_2(inv, data): _parse_version_1(inv, data) _parse_group_aliasses(inv, data["group_aliasses"]) parser_version_mapping = { None: _parse_version_0, # legacy version without version number, only hosts list with tags 1: _parse_version_1, # adds support for default, inversed group dependencies and host_groups aside single_hosts (ignores aliases supported with version 2) 2: _parse_version_2, # adds support for aliases (thus destroying the common graph structures where aliasses were used) } def parse(path): data = _read_yaml(path) inv = Inventory() version = data.get("version", None) # detect that version was used as hostname if not isinstance(version, (int, float, complex)): version = None if version not in parser_version_mapping: raise AnsibleError(Exception("Version not supported")) parser_version_mapping[version](inv, data) return inv.export() print(json.dumps(parse("hosts.yml")))