You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/packaging/release/changelogs/changelog.py

819 lines
27 KiB
Python

#!/usr/bin/env python
# PYTHON_ARGCOMPLETE_OK
"""Changelog generator and linter."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import argparse
import collections
import datetime
import docutils.utils
import json
import logging
import os
import packaging.version
import re
import rstcheck
import subprocess
import sys
import yaml
try:
import argcomplete
except ImportError:
argcomplete = None
from ansible import constants as C
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
CHANGELOG_DIR = os.path.join(BASE_DIR, 'changelogs')
CONFIG_PATH = os.path.join(CHANGELOG_DIR, 'config.yaml')
CHANGES_PATH = os.path.join(CHANGELOG_DIR, '.changes.yaml')
LOGGER = logging.getLogger('changelog')
def main():
"""Main program entry point."""
parser = argparse.ArgumentParser(description='Changelog generator and linter.')
common = argparse.ArgumentParser(add_help=False)
common.add_argument('-v', '--verbose',
action='count',
default=0,
help='increase verbosity of output')
subparsers = parser.add_subparsers(metavar='COMMAND')
lint_parser = subparsers.add_parser('lint',
parents=[common],
help='check changelog fragments for syntax errors')
lint_parser.set_defaults(func=command_lint)
lint_parser.add_argument('fragments',
metavar='FRAGMENT',
nargs='*',
help='path to fragment to test')
release_parser = subparsers.add_parser('release',
parents=[common],
help='add a new release to the change metadata')
release_parser.set_defaults(func=command_release)
release_parser.add_argument('--version',
help='override release version')
release_parser.add_argument('--codename',
help='override release codename')
release_parser.add_argument('--date',
default=str(datetime.date.today()),
help='override release date')
release_parser.add_argument('--reload-plugins',
action='store_true',
help='force reload of plugin cache')
generate_parser = subparsers.add_parser('generate',
parents=[common],
help='generate the changelog')
generate_parser.set_defaults(func=command_generate)
generate_parser.add_argument('--reload-plugins',
action='store_true',
help='force reload of plugin cache')
if argcomplete:
argcomplete.autocomplete(parser)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
LOGGER.setLevel(logging.WARN)
args = parser.parse_args()
if args.verbose > 2:
LOGGER.setLevel(logging.DEBUG)
elif args.verbose > 1:
LOGGER.setLevel(logging.INFO)
elif args.verbose > 0:
LOGGER.setLevel(logging.WARN)
args.func(args)
def command_lint(args):
"""
:type args: any
"""
paths = args.fragments # type: list
exceptions = []
fragments = load_fragments(paths, exceptions)
lint_fragments(fragments, exceptions)
def command_release(args):
"""
:type args: any
"""
version = args.version # type: str
codename = args.codename # type: str
date = datetime.datetime.strptime(args.date, "%Y-%m-%d").date()
reload_plugins = args.reload_plugins # type: bool
if not version or not codename:
import ansible.release
version = version or ansible.release.__version__
codename = codename or ansible.release.__codename__
changes = load_changes()
plugins = load_plugins(version=version, force_reload=reload_plugins)
fragments = load_fragments()
add_release(changes, plugins, fragments, version, codename, date)
generate_changelog(changes, plugins, fragments)
def command_generate(args):
"""
:type args: any
"""
reload_plugins = args.reload_plugins # type: bool
changes = load_changes()
plugins = load_plugins(version=changes.latest_version, force_reload=reload_plugins)
fragments = load_fragments()
generate_changelog(changes, plugins, fragments)
def load_changes():
"""Load changes metadata.
:rtype: ChangesMetadata
"""
changes = ChangesMetadata(CHANGES_PATH)
return changes
def load_plugins(version, force_reload):
"""Load plugins from ansible-doc.
:type version: str
:type force_reload: bool
:rtype: list[PluginDescription]
"""
plugin_cache_path = os.path.join(CHANGELOG_DIR, '.plugin-cache.yaml')
plugins_data = {}
if not force_reload and os.path.exists(plugin_cache_path):
with open(plugin_cache_path, 'r') as plugin_cache_fd:
plugins_data = yaml.safe_load(plugin_cache_fd)
if version != plugins_data['version']:
LOGGER.info('version %s does not match plugin cache version %s', version, plugins_data['version'])
plugins_data = {}
if not plugins_data:
LOGGER.info('refreshing plugin cache')
plugins_data['version'] = version
for plugin_type in C.DOCUMENTABLE_PLUGINS:
plugins_data['plugins'][plugin_type] = json.loads(subprocess.check_output([os.path.join(BASE_DIR, 'bin', 'ansible-doc'),
'--json', '-t', plugin_type]))
# remove empty namespaces from plugins
for section in plugins_data['plugins'].values():
for plugin in section.values():
if plugin['namespace'] is None:
del plugin['namespace']
with open(plugin_cache_path, 'w') as plugin_cache_fd:
yaml.safe_dump(plugins_data, plugin_cache_fd, default_flow_style=False)
plugins = PluginDescription.from_dict(plugins_data['plugins'])
return plugins
def load_fragments(paths=None, exceptions=None):
"""
:type paths: list[str] | None
:type exceptions: list[tuple[str, Exception]] | None
"""
if not paths:
config = ChangelogConfig(CONFIG_PATH)
fragments_dir = os.path.join(CHANGELOG_DIR, config.notes_dir)
paths = [os.path.join(fragments_dir, path) for path in os.listdir(fragments_dir)]
fragments = []
for path in paths:
try:
fragments.append(ChangelogFragment.load(path))
except Exception as ex:
if exceptions is not None:
exceptions.append((path, ex))
else:
raise
return fragments
def lint_fragments(fragments, exceptions):
"""
:type fragments: list[ChangelogFragment]
:type exceptions: list[tuple[str, Exception]]
"""
config = ChangelogConfig(CONFIG_PATH)
linter = ChangelogFragmentLinter(config)
errors = [(ex[0], 0, 0, 'yaml parsing error') for ex in exceptions]
for fragment in fragments:
errors += linter.lint(fragment)
messages = sorted(set('%s:%d:%d: %s' % (error[0], error[1], error[2], error[3]) for error in errors))
for message in messages:
print(message)
def add_release(changes, plugins, fragments, version, codename, date):
"""Add a release to the change metadata.
:type changes: ChangesMetadata
:type plugins: list[PluginDescription]
:type fragments: list[ChangelogFragment]
:type version: str
:type codename: str
:type date: datetime.date
"""
# make sure the version parses
packaging.version.Version(version)
LOGGER.info('release version %s is a %s version', version, 'release' if is_release_version(version) else 'pre-release')
# filter out plugins which were not added in this release
plugins = list(filter(lambda p: version.startswith('%s.' % p.version_added), plugins))
changes.add_release(version, codename, date)
for plugin in plugins:
changes.add_plugin(plugin.type, plugin.name, version)
for fragment in fragments:
changes.add_fragment(fragment.name, version)
changes.save()
def generate_changelog(changes, plugins, fragments):
"""Generate the changelog.
:type changes: ChangesMetadata
:type plugins: list[PluginDescription]
:type fragments: list[ChangelogFragment]
"""
config = ChangelogConfig(CONFIG_PATH)
changes.prune_plugins(plugins)
changes.prune_fragments(fragments)
changes.save()
major_minor_version = '.'.join(changes.latest_version.split('.')[:2])
changelog_path = os.path.join(CHANGELOG_DIR, 'CHANGELOG-v%s.rst' % major_minor_version)
generator = ChangelogGenerator(config, changes, plugins, fragments)
rst = generator.generate()
with open(changelog_path, 'w') as changelog_fd:
changelog_fd.write(rst)
class ChangelogFragmentLinter(object):
"""Linter for ChangelogFragments."""
def __init__(self, config):
"""
:type config: ChangelogConfig
"""
self.config = config
def lint(self, fragment):
"""Lint a ChangelogFragment.
:type fragment: ChangelogFragment
:rtype: list[(str, int, int, str)]
"""
errors = []
for section, lines in fragment.content.items():
if section not in self.config.sections:
errors.append((fragment.path, 0, 0, 'invalid section: %s' % section))
if isinstance(lines, list):
for line in lines:
results = rstcheck.check(line, filename=fragment.path, report_level=docutils.utils.Reporter.WARNING_LEVEL)
errors += [(fragment.path, 0, 0, result[1]) for result in results]
else:
results = rstcheck.check(lines, filename=fragment.path, report_level=docutils.utils.Reporter.WARNING_LEVEL)
errors += [(fragment.path, 0, 0, result[1]) for result in results]
return errors
def is_release_version(version):
"""Deterine the type of release from the given version.
:type version: str
:rtype: bool
"""
config = ChangelogConfig(CONFIG_PATH)
tag_format = 'v%s' % version
if re.search(config.pre_release_tag_re, tag_format):
return False
if re.search(config.release_tag_re, tag_format):
return True
raise Exception('unsupported version format: %s' % version)
class PluginDescription(object):
"""Plugin description."""
def __init__(self, plugin_type, name, namespace, description, version_added):
self.type = plugin_type
self.name = name
self.namespace = namespace
self.description = description
self.version_added = version_added
@staticmethod
def from_dict(data):
"""Return a list of PluginDescription objects from the given data.
:type data: dict[str, dict[str, dict[str, any]]]
:rtype: list[PluginDescription]
"""
plugins = []
for plugin_type, plugin_data in data.items():
for plugin_name, plugin_details in plugin_data.items():
plugins.append(PluginDescription(
plugin_type=plugin_type,
name=plugin_name,
namespace=plugin_details.get('namespace'),
description=plugin_details['description'],
version_added=plugin_details['version_added'],
))
return plugins
class ChangelogGenerator(object):
"""Changelog generator."""
def __init__(self, config, changes, plugins, fragments):
"""
:type config: ChangelogConfig
:type changes: ChangesMetadata
:type plugins: list[PluginDescription]
:type fragments: list[ChangelogFragment]
"""
self.config = config
self.changes = changes
self.plugins = {}
self.modules = []
for plugin in plugins:
if plugin.type == 'module':
self.modules.append(plugin)
else:
if plugin.type not in self.plugins:
self.plugins[plugin.type] = []
self.plugins[plugin.type].append(plugin)
self.fragments = dict((fragment.name, fragment) for fragment in fragments)
def generate(self):
"""Generate the changelog.
:rtype: str
"""
latest_version = self.changes.latest_version
codename = self.changes.releases[latest_version]['codename']
major_minor_version = '.'.join(latest_version.split('.')[:2])
release_entries = collections.OrderedDict()
entry_version = latest_version
entry_fragment = None
for version in sorted(self.changes.releases, reverse=True, key=packaging.version.Version):
release = self.changes.releases[version]
if is_release_version(version):
entry_version = version # next version is a release, it needs its own entry
entry_fragment = None
elif not is_release_version(entry_version):
entry_version = version # current version is a pre-release, next version needs its own entry
entry_fragment = None
if entry_version not in release_entries:
release_entries[entry_version] = dict(
fragments=[],
modules=[],
plugins={},
)
entry_config = release_entries[entry_version]
fragment_names = []
# only keep the latest prelude fragment for an entry
for fragment_name in release.get('fragments', []):
fragment = self.fragments[fragment_name]
if self.config.prelude_name in fragment.content:
if entry_fragment:
LOGGER.info('skipping fragment %s in version %s due to newer fragment %s in version %s',
fragment_name, version, entry_fragment, entry_version)
continue
entry_fragment = fragment_name
fragment_names.append(fragment_name)
entry_config['fragments'] += fragment_names
entry_config['modules'] += release.get('modules', [])
for plugin_type, plugin_names in release.get('plugins', {}).items():
if plugin_type not in entry_config['plugins']:
entry_config['plugins'][plugin_type] = []
entry_config['plugins'][plugin_type] += plugin_names
builder = RstBuilder()
builder.set_title('Ansible %s "%s" Release Notes' % (major_minor_version, codename))
builder.add_raw_rst('.. contents:: Topics\n\n')
for version, release in release_entries.items():
builder.add_section('v%s' % version)
combined_fragments = ChangelogFragment.combine([self.fragments[fragment] for fragment in release['fragments']])
for section_name in self.config.sections:
self._add_section(builder, combined_fragments, section_name)
self._add_plugins(builder, release['plugins'])
self._add_modules(builder, release['modules'])
return builder.generate()
def _add_section(self, builder, combined_fragments, section_name):
if section_name not in combined_fragments:
return
section_title = self.config.sections[section_name]
builder.add_section(section_title, 1)
content = combined_fragments[section_name]
if isinstance(content, list):
for rst in sorted(content):
builder.add_raw_rst('- %s' % rst)
else:
builder.add_raw_rst(content)
builder.add_raw_rst('')
def _add_plugins(self, builder, plugin_types_and_names):
if not plugin_types_and_names:
return
have_section = False
for plugin_type in sorted(self.plugins):
plugins = dict((plugin.name, plugin) for plugin in self.plugins[plugin_type] if plugin.name in plugin_types_and_names.get(plugin_type, []))
if not plugins:
continue
if not have_section:
have_section = True
builder.add_section('New Plugins', 1)
builder.add_section(plugin_type.title(), 2)
for plugin_name in sorted(plugins):
plugin = plugins[plugin_name]
builder.add_raw_rst('- %s - %s' % (plugin.name, plugin.description))
builder.add_raw_rst('')
def _add_modules(self, builder, module_names):
if not module_names:
return
modules = dict((module.name, module) for module in self.modules if module.name in module_names)
previous_section = None
modules_by_namespace = collections.defaultdict(list)
for module_name in sorted(modules):
module = modules[module_name]
modules_by_namespace[module.namespace].append(module.name)
for namespace in sorted(modules_by_namespace):
parts = namespace.split('.')
section = parts.pop(0).replace('_', ' ').title()
if not previous_section:
builder.add_section('New Modules', 1)
if section != previous_section:
builder.add_section(section, 2)
previous_section = section
subsection = '.'.join(parts)
if subsection:
builder.add_section(subsection, 3)
for module_name in modules_by_namespace[namespace]:
module = modules[module_name]
builder.add_raw_rst('- %s - %s' % (module.name, module.description))
builder.add_raw_rst('')
class ChangelogFragment(object):
"""Changelog fragment loader."""
def __init__(self, content, path):
"""
:type content: dict[str, list[str]]
:type path: str
"""
self.content = content
self.path = path
self.name = os.path.basename(path)
@staticmethod
def load(path):
"""Load a ChangelogFragment from a file.
:type path: str
"""
with open(path, 'r') as fragment_fd:
content = yaml.safe_load(fragment_fd)
return ChangelogFragment(content, path)
@staticmethod
def combine(fragments):
"""Combine fragments into a new fragment.
:type fragments: list[ChangelogFragment]
:rtype: dict[str, list[str] | str]
"""
result = {}
for fragment in fragments:
for section, content in fragment.content.items():
if isinstance(content, list):
if section not in result:
result[section] = []
result[section] += content
else:
result[section] = content
return result
class ChangelogConfig(object):
"""Configuration for changelogs."""
def __init__(self, path):
"""
:type path: str
"""
with open(path, 'r') as config_fd:
self.config = yaml.safe_load(config_fd)
self.notes_dir = self.config.get('notesdir', 'fragments')
self.prelude_name = self.config.get('prelude_section_name', 'release_summary')
self.prelude_title = self.config.get('prelude_section_title', 'Release Summary')
self.new_plugins_after_name = self.config.get('new_plugins_after_name', '')
self.release_tag_re = self.config.get('release_tag_re', r'((?:[\d.ab]|rc)+)')
self.pre_release_tag_re = self.config.get('pre_release_tag_re', r'(?P<pre_release>\.\d+(?:[ab]|rc)+\d*)$')
self.sections = collections.OrderedDict([(self.prelude_name, self.prelude_title)])
for section_name, section_title in self.config['sections']:
self.sections[section_name] = section_title
class RstBuilder(object):
"""Simple RST builder."""
def __init__(self):
self.lines = []
self.section_underlines = '''=-~^.*+:`'"_#'''
def set_title(self, title):
"""Set the title.
:type title: str
"""
self.lines.append(self.section_underlines[0] * len(title))
self.lines.append(title)
self.lines.append(self.section_underlines[0] * len(title))
self.lines.append('')
def add_section(self, name, depth=0):
"""Add a section.
:type name: str
:type depth: int
"""
self.lines.append(name)
self.lines.append(self.section_underlines[depth] * len(name))
self.lines.append('')
def add_raw_rst(self, content):
"""Add a raw RST.
:type content: str
"""
self.lines.append(content)
def generate(self):
"""Generate RST content.
:rtype: str
"""
return '\n'.join(self.lines)
class ChangesMetadata(object):
"""Read, write and manage change metadata."""
def __init__(self, path):
self.path = path
self.data = self.empty()
self.known_fragments = set()
self.known_plugins = set()
self.load()
@staticmethod
def empty():
"""Empty change metadata."""
return dict(
releases=dict(
),
)
@property
def latest_version(self):
"""Latest version in the changes.
:rtype: str
"""
return sorted(self.releases, reverse=True, key=packaging.version.Version)[0]
@property
def releases(self):
"""Dictionary of releases.
:rtype: dict[str, dict[str, any]]
"""
return self.data['releases']
def load(self):
"""Load the change metadata from disk."""
if os.path.exists(self.path):
with open(self.path, 'r') as meta_fd:
self.data = yaml.safe_load(meta_fd)
else:
self.data = self.empty()
for version, config in self.releases.items():
self.known_fragments |= set(config.get('fragments', []))
for plugin_type, plugin_names in config.get('plugins', {}).items():
self.known_plugins |= set('%s/%s' % (plugin_type, plugin_name) for plugin_name in plugin_names)
module_names = config.get('modules', [])
self.known_plugins |= set('module/%s' % module_name for module_name in module_names)
def prune_plugins(self, plugins):
"""Remove plugins which are not in the provided list of plugins.
:type plugins: list[PluginDescription]
"""
valid_plugins = collections.defaultdict(set)
for plugin in plugins:
valid_plugins[plugin.type].add(plugin.name)
for version, config in self.releases.items():
if 'modules' in config:
invalid_modules = set(module for module in config['modules'] if module not in valid_plugins['module'])
config['modules'] = [module for module in config['modules'] if module not in invalid_modules]
self.known_plugins -= set('module/%s' % module for module in invalid_modules)
if 'plugins' in config:
for plugin_type in config['plugins']:
invalid_plugins = set(plugin for plugin in config['plugins'][plugin_type] if plugin not in valid_plugins[plugin_type])
config['plugins'][plugin_type] = [plugin for plugin in config['plugins'][plugin_type] if plugin not in invalid_plugins]
self.known_plugins -= set('%s/%s' % (plugin_type, plugin) for plugin in invalid_plugins)
def prune_fragments(self, fragments):
"""Remove fragments which are not in the provided list of fragments.
:type fragments: list[ChangelogFragment]
"""
valid_fragments = set(fragment.name for fragment in fragments)
for version, config in self.releases.items():
if 'fragments' not in config:
continue
invalid_fragments = set(fragment for fragment in config['fragments'] if fragment not in valid_fragments)
config['fragments'] = [fragment for fragment in config['fragments'] if fragment not in invalid_fragments]
self.known_fragments -= set(config['fragments'])
def sort(self):
"""Sort change metadata in place."""
for release, config in self.data['releases'].items():
if 'fragments' in config:
config['fragments'] = sorted(config['fragments'])
if 'modules' in config:
config['modules'] = sorted(config['modules'])
if 'plugins' in config:
for plugin_type in config['plugins']:
config['plugins'][plugin_type] = sorted(config['plugins'][plugin_type])
def save(self):
"""Save the change metadata to disk."""
self.sort()
with open(self.path, 'w') as config_fd:
yaml.safe_dump(self.data, config_fd, default_flow_style=False)
def add_release(self, version, codename, release_date):
"""Add a new releases to the changes metadata.
:type version: str
:type codename: str
:type release_date: datetime.date
"""
if version not in self.releases:
self.releases[version] = dict(
codename=codename,
release_date=str(release_date),
)
else:
LOGGER.warning('release %s already exists', version)
def add_fragment(self, fragment_name, version):
"""Add a changelog fragment to the change metadata.
:type fragment_name: str
:type version: str
"""
if fragment_name in self.known_fragments:
return False
self.known_fragments.add(fragment_name)
if 'fragments' not in self.releases[version]:
self.releases[version]['fragments'] = []
fragments = self.releases[version]['fragments']
fragments.append(fragment_name)
return True
def add_plugin(self, plugin_type, plugin_name, version):
"""Add a plugin to the change metadata.
:type plugin_type: str
:type plugin_name: str
:type version: str
"""
composite_name = '%s/%s' % (plugin_type, plugin_name)
if composite_name in self.known_plugins:
return False
self.known_plugins.add(composite_name)
if plugin_type == 'module':
if 'modules' not in self.releases[version]:
self.releases[version]['modules'] = []
modules = self.releases[version]['modules']
modules.append(plugin_name)
else:
if 'plugins' not in self.releases[version]:
self.releases[version]['plugins'] = {}
plugins = self.releases[version]['plugins']
if plugin_type not in plugins:
plugins[plugin_type] = []
plugins[plugin_type].append(plugin_name)
return True
if __name__ == '__main__':
main()