Merge pull request #6069 from danieljaouen/homebrew_class

Update homebrew module.
pull/5986/merge
Michael DeHaan 11 years ago
commit 11dd81c724

@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
# (c) 2013, Andrew Dunham <andrew@du.nham.ca>
# (c) 2013, Daniel Jaouen <dcj24@cornell.edu>
#
# Based on macports (Jimmy Tang <jcftang@gmail.com>)
#
# This module is free software: you can redistribute it and/or modify
@ -20,11 +22,11 @@
DOCUMENTATION = '''
---
module: homebrew
author: Andrew Dunham
author: Andrew Dunham and Daniel Jaouen
short_description: Package manager for Homebrew
description:
- Manages Homebrew packages
version_added: "1.4"
version_added: "1.1"
options:
name:
description:
@ -33,7 +35,7 @@ options:
state:
description:
- state of the package
choices: [ 'present', 'absent' ]
choices: [ 'head', 'latest', 'present', 'absent', 'linked', 'unlinked' ]
required: false
default: present
update_homebrew:
@ -52,125 +54,733 @@ notes: []
EXAMPLES = '''
- homebrew: name=foo state=present
- homebrew: name=foo state=present update_homebrew=yes
- homebrew: name=foo state=latest update_homebrew=yes
- homebrew: update_homebrew=yes upgrade=yes
- homebrew: name=foo state=head
- homebrew: name=foo state=linked
- homebrew: name=foo state=absent
- homebrew: name=foo,bar state=absent
- homebrew: name=foo state=present install_options=with-baz,enable-debug
'''
import os.path
import re
def update_homebrew(module, brew_path):
""" Updates packages list. """
rc, out, err = module.run_command("%s update" % brew_path)
# exceptions -------------------------------------------------------------- {{{
class HomebrewException(Exception):
pass
# /exceptions ------------------------------------------------------------- }}}
if rc != 0:
module.fail_json(msg="could not update homebrew")
# utils ------------------------------------------------------------------- {{{
def _create_regex_group(s):
lines = (line.strip() for line in s.split('\n') if line.strip())
chars = filter(None, (line.split('#')[0].strip() for line in lines))
group = r'[^' + r''.join(chars) + r']'
return re.compile(group)
# /utils ------------------------------------------------------------------ }}}
def query_package(module, brew_path, name, state="present"):
""" Returns whether a package is installed or not. """
if state == "present":
rc, out, err = module.run_command("%s list %s" % (brew_path, name))
if rc == 0:
class Homebrew(object):
'''A class to manage Homebrew packages.'''
# class regexes ------------------------------------------------ {{{
VALID_PATH_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
\s # spaces
: # colons
{sep} # the OS-specific path separator
- # dashes
'''.format(sep=os.path.sep)
VALID_BREW_PATH_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
\s # spaces
{sep} # the OS-specific path separator
- # dashes
'''.format(sep=os.path.sep)
VALID_PACKAGE_CHARS = r'''
\w # alphanumeric characters (i.e., [a-zA-Z0-9_])
- # dashes
'''
INVALID_PATH_REGEX = _create_regex_group(VALID_PATH_CHARS)
INVALID_BREW_PATH_REGEX = _create_regex_group(VALID_BREW_PATH_CHARS)
INVALID_PACKAGE_REGEX = _create_regex_group(VALID_PACKAGE_CHARS)
# /class regexes ----------------------------------------------- }}}
# class validations -------------------------------------------- {{{
@classmethod
def valid_path(cls, path):
'''
`path` must be one of:
- list of paths
- a string containing only:
- alphanumeric characters
- dashes
- spaces
- colons
- os.path.sep
'''
if isinstance(path, basestring):
return not cls.INVALID_PATH_REGEX.search(path)
try:
iter(path)
except TypeError:
return False
else:
paths = path
return all(cls.valid_brew_path(path_) for path_ in paths)
@classmethod
def valid_brew_path(cls, brew_path):
'''
`brew_path` must be one of:
- None
- a string containing only:
- alphanumeric characters
- dashes
- spaces
- os.path.sep
'''
if brew_path is None:
return True
return (
isinstance(brew_path, basestring)
and not cls.INVALID_BREW_PATH_REGEX.search(brew_path)
)
@classmethod
def valid_package(cls, package):
'''A valid package is either None or alphanumeric.'''
if package is None:
return True
return (
isinstance(package, basestring)
and not cls.INVALID_PACKAGE_REGEX.search(package)
)
@classmethod
def valid_state(cls, state):
'''
A valid state is one of:
- None
- installed
- upgraded
- head
- linked
- unlinked
- absent
'''
if state is None:
return True
else:
return (
isinstance(state, basestring)
and state.lower() in (
'installed',
'upgraded',
'head',
'linked',
'unlinked',
'absent',
)
)
@classmethod
def valid_module(cls, module):
'''A valid module is an instance of AnsibleModule.'''
return isinstance(module, AnsibleModule)
# /class validations ------------------------------------------- }}}
# class properties --------------------------------------------- {{{
@property
def module(self):
return self._module
@module.setter
def module(self, module):
if not self.valid_module(module):
self._module = None
self.failed = True
self.message = 'Invalid module: {0}.'.format(module)
raise HomebrewException(self.message)
else:
self._module = module
return module
@property
def path(self):
return self._path
@path.setter
def path(self, path):
if not self.valid_path(path):
self._path = []
self.failed = True
self.message = 'Invalid path: {0}.'.format(path)
raise HomebrewException(self.message)
else:
if isinstance(path, basestring):
self._path = path.split(':')
else:
self._path = path
return path
@property
def brew_path(self):
return self._brew_path
@brew_path.setter
def brew_path(self, brew_path):
if not self.valid_brew_path(brew_path):
self._brew_path = None
self.failed = True
self.message = 'Invalid brew_path: {0}.'.format(brew_path)
raise HomebrewException(self.message)
else:
self._brew_path = brew_path
return brew_path
@property
def params(self):
return self._params
@params.setter
def params(self, params):
self._params = self.module.params
return self._params
@property
def current_package(self):
return self._current_package
@current_package.setter
def current_package(self, package):
if not self.valid_package(package):
self._current_package = None
self.failed = True
self.message = 'Invalid package: {0}.'.format(package)
raise HomebrewException(self.message)
else:
self._current_package = package
return package
# /class properties -------------------------------------------- }}}
def __init__(self, module, path=None, packages=None, state=None,
update_homebrew=False, install_options=None):
if not install_options:
install_options = list()
self._setup_status_vars()
self._setup_instance_vars(module=module, path=path, packages=packages,
state=state, update_homebrew=update_homebrew,
install_options=install_options, )
self._prep()
# prep --------------------------------------------------------- {{{
def _setup_status_vars(self):
self.failed = False
self.changed = False
self.changed_count = 0
self.unchanged_count = 0
self.message = ''
def _setup_instance_vars(self, **kwargs):
for key, val in kwargs.iteritems():
setattr(self, key, val)
def _prep(self):
self._prep_path()
self._prep_brew_path()
def _prep_path(self):
if not self.path:
self.path = ['/usr/local/bin']
def _prep_brew_path(self):
if not self.module:
self.brew_path = None
self.failed = True
self.message = 'AnsibleModule not set.'
raise HomebrewException(self.message)
self.brew_path = self.module.get_bin_path(
'brew',
required=True,
opt_dirs=self.path,
)
if not self.brew_path:
self.brew_path = None
self.failed = True
self.message = 'Unable to locate homebrew executable.'
raise HomebrewException('Unable to locate homebrew executable.')
return self.brew_path
def _status(self):
return (self.failed, self.changed, self.message)
# /prep -------------------------------------------------------- }}}
def run(self):
try:
self._run()
except HomebrewException:
pass
if not self.failed and (self.changed_count + self.unchanged_count > 1):
self.message = "Changed: %d, Unchanged: %d" % (
self.changed_count,
self.unchanged_count,
)
(failed, changed, message) = self._status()
return (failed, changed, message)
# checks ------------------------------------------------------- {{{
def _current_package_is_installed(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
cmd = [
"{brew_path}".format(brew_path=self.brew_path),
"list",
"-m1",
]
rc, out, err = self.module.run_command(cmd)
packages = [package for package in out.split('\n') if package]
if rc == 0 and self.current_package in packages:
return True
else:
return False
def _outdated_packages(self):
rc, out, err = self.module.run_command([
self.brew_path,
'outdated',
])
return [line.split(' ')[0].strip() for line in out.split('\n') if line]
def _current_package_is_outdated(self):
if not self.valid_package(self.current_package):
return False
return self.current_package in self._outdated_packages()
def _current_package_is_installed_from_head(self):
if not Homebrew.valid_package(self.current_package):
return False
elif not self._current_package_is_installed():
return False
rc, out, err = self.module.run_command([
self.brew_path,
'info',
self.current_package,
])
try:
version_info = [line for line in out.split('\n') if line][0]
except IndexError:
return False
return version_info.split(' ')[-1] == 'HEAD'
# /checks ------------------------------------------------------ }}}
# commands ----------------------------------------------------- {{{
def _run(self):
if self.update_homebrew:
self._update_homebrew()
if self.packages:
if self.state == 'installed':
return self._install_packages()
elif self.state == 'upgraded':
return self._upgrade_packages()
elif self.state == 'head':
return self._install_packages()
elif self.state == 'linked':
return self._link_packages()
elif self.state == 'unlinked':
return self._unlink_packages()
elif self.state == 'absent':
return self._uninstall_packages()
# updated -------------------------------- {{{
def _update_homebrew(self):
rc, out, err = self.module.run_command([
self.brew_path,
'update',
])
if rc == 0:
if out and isinstance(out, basestring):
already_updated = any(
re.search(r'Already up-to-date.', s.strip(), re.IGNORECASE)
for s in out.split('\n')
if s
)
if not already_updated:
self.changed = True
self.message = 'Homebrew updated successfully.'
else:
self.message = 'Homebrew already up-to-date.'
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
# /updated ------------------------------- }}}
# installed ------------------------------ {{{
def _install_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if self._current_package_is_installed():
self.unchanged_count += 1
self.message = 'Package already installed: {0}'.format(
self.current_package,
)
return True
def remove_packages(module, brew_path, packages):
""" Uninstalls one or more packages if installed. """
if self.module.check_mode:
self.changed = True
self.message = 'Package would be installed: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
removed_count = 0
if self.state == 'head':
head = '--HEAD'
else:
head = None
# Using a for loop incase of error, we can report the package that failed
for package in packages:
# Query the package first, to see if we even need to remove.
if not query_package(module, brew_path, package):
continue
opts = (
[self.brew_path, 'install']
+ self.install_options
+ [self.current_package, head]
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
if module.check_mode:
module.exit_json(changed=True)
rc, out, err = module.run_command([brew_path, 'remove', package])
if self._current_package_is_installed():
self.changed_count += 1
self.changed = True
self.message = 'Package installed: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
if query_package(module, brew_path, package):
module.fail_json(msg="failed to remove %s: %s" % (package, out.strip()))
def _install_packages(self):
for package in self.packages:
self.current_package = package
self._install_current_package()
removed_count += 1
return True
# /installed ----------------------------- }}}
if removed_count > 0:
module.exit_json(changed=True, msg="removed %d package(s)" % removed_count)
# upgraded ------------------------------- {{{
def _upgrade_current_package(self):
command = 'upgrade'
module.exit_json(changed=False, msg="package(s) already absent")
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
command = 'install'
def install_packages(module, brew_path, packages, options):
""" Installs one or more packages if not already installed. """
if self._current_package_is_installed() and not self._current_package_is_outdated():
self.message = 'Package is already upgraded: {0}'.format(
self.current_package,
)
self.unchanged_count += 1
return True
installed_count = 0
if self.module.check_mode:
self.changed = True
self.message = 'Package would be upgraded: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
for package in packages:
if query_package(module, brew_path, package):
continue
opts = (
[self.brew_path, command]
+ self.install_options
+ [self.current_package]
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
if module.check_mode:
module.exit_json(changed=True)
if not self._current_package_is_outdated():
self.changed_count += 1
self.changed = True
self.message = 'Package upgraded: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
def _upgrade_all_packages(self):
opts = (
[self.brew_path, 'upgrade']
+ self.install_options
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
cmd = [brew_path, 'install', package]
if options:
cmd.extend(options)
rc, out, err = module.run_command(cmd)
if rc == 0:
self.changed = True
self.message = 'All packages upgraded.'
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
def _upgrade_packages(self):
if not self.packages:
self._upgrade_all_packages()
else:
for package in self.packages:
self.current_package = package
self._upgrade_current_package()
return True
# /upgraded ------------------------------ }}}
# uninstalled ---------------------------- {{{
def _uninstall_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
self.unchanged_count += 1
self.message = 'Package already uninstalled: {0}'.format(
self.current_package,
)
return True
if not query_package(module, brew_path, package):
module.fail_json(msg="failed to install %s: '%s' %s" % (package, cmd, out.strip()))
if self.module.check_mode:
self.changed = True
self.message = 'Package would be uninstalled: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
installed_count += 1
opts = (
[self.brew_path, 'uninstall']
+ self.install_options
+ [self.current_package]
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
if installed_count > 0:
module.exit_json(changed=True, msg="installed %d package(s)" % (installed_count,))
if not self._current_package_is_installed():
self.changed_count += 1
self.changed = True
self.message = 'Package uninstalled: {0}'.format(self.current_package)
return True
else:
self.failed = True
self.message = err.strip()
raise HomebrewException(self.message)
module.exit_json(changed=False, msg="package(s) already present")
def _uninstall_packages(self):
for package in self.packages:
self.current_package = package
self._uninstall_current_package()
def generate_options_string(install_options):
if install_options is None:
return None
return True
# /uninstalled ----------------------------- }}}
# linked --------------------------------- {{{
def _link_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
self.failed = True
self.message = 'Package not installed: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if self.module.check_mode:
self.changed = True
self.message = 'Package would be linked: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
options = []
opts = (
[self.brew_path, 'link']
+ self.install_options
+ [self.current_package]
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
for option in install_options:
options.append('--%s' % option)
if rc == 0:
self.changed_count += 1
self.changed = True
self.message = 'Package linked: {0}'.format(self.current_package)
return options
return True
else:
self.failed = True
self.message = 'Package could not be linked: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
def _link_packages(self):
for package in self.packages:
self.current_package = package
self._link_current_package()
def main():
module = AnsibleModule(
argument_spec = dict(
name = dict(aliases=["pkg"], required=True),
state = dict(default="present", choices=["present", "installed", "absent", "removed"]),
update_homebrew = dict(default="no", aliases=["update-brew"], type='bool'),
install_options = dict(default=None, aliases=["options"], type='list')
),
supports_check_mode=True
return True
# /linked -------------------------------- }}}
# unlinked ------------------------------- {{{
def _unlink_current_package(self):
if not self.valid_package(self.current_package):
self.failed = True
self.message = 'Invalid package: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if not self._current_package_is_installed():
self.failed = True
self.message = 'Package not installed: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
if self.module.check_mode:
self.changed = True
self.message = 'Package would be unlinked: {0}'.format(
self.current_package
)
raise HomebrewException(self.message)
brew_path = module.get_bin_path('brew', True, ['/usr/local/bin'])
opts = (
[self.brew_path, 'unlink']
+ self.install_options
+ [self.current_package]
)
cmd = [opt for opt in opts if opt]
rc, out, err = self.module.run_command(cmd)
p = module.params
if rc == 0:
self.changed_count += 1
self.changed = True
self.message = 'Package unlinked: {0}'.format(self.current_package)
if p["update_homebrew"]:
update_homebrew(module, brew_path)
return True
else:
self.failed = True
self.message = 'Package could not be unlinked: {0}.'.format(self.current_package)
raise HomebrewException(self.message)
pkgs = p["name"].split(",")
def _unlink_packages(self):
for package in self.packages:
self.current_package = package
self._unlink_current_package()
if p["state"] in ["present", "installed"]:
opt = generate_options_string(p["install_options"])
install_packages(module, brew_path, pkgs, opt)
return True
# /unlinked ------------------------------ }}}
# /commands ---------------------------------------------------- }}}
elif p["state"] in ["absent", "removed"]:
remove_packages(module, brew_path, pkgs)
# import module snippets
from ansible.module_utils.basic import *
def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(aliases=["pkg"], required=False),
path=dict(required=False),
state=dict(
default="present",
choices=[
"present", "installed",
"latest", "upgraded", "head",
"linked", "unlinked",
"absent", "removed", "uninstalled",
],
),
update_homebrew=dict(
default="no",
aliases=["update-brew"],
type='bool',
),
install_options=dict(
default=None,
aliases=['options'],
type='list',
)
),
supports_check_mode=True,
)
p = module.params
if p['name']:
packages = p['name'].split(',')
else:
packages = None
path = p['path']
if path:
path = path.split(':')
else:
path = ['/usr/local/bin']
state = p['state']
if state in ('present', 'installed', 'head'):
state = 'installed'
if state in ('latest', 'upgraded'):
state = 'upgraded'
if state == 'linked':
state = 'linked'
if state == 'unlinked':
state = 'unlinked'
if state in ('absent', 'removed', 'uninstalled'):
state = 'absent'
update_homebrew = p['update_homebrew']
p['install_options'] = p['install_options'] or []
install_options = ['--{0}'.format(install_option)
for install_option in p['install_options']]
brew = Homebrew(module=module, path=path, packages=packages,
state=state, update_homebrew=update_homebrew,
install_options=install_options)
(failed, changed, message) = brew.run()
if failed:
module.fail_json(msg=message)
else:
module.exit_json(changed=changed, msg=message)
# this is magic, see lib/ansible/module_common.py
#<<INCLUDE_ANSIBLE_MODULE_COMMON>>
main()

Loading…
Cancel
Save