ansible/utils/: PEP8 compliancy (#24686)

- Make PEP8 compliant
pull/25219/head
Dag Wieers 7 years ago committed by John R Barker
parent d5ad3093d6
commit 51b595992b

@ -19,10 +19,10 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import select
import shlex
import subprocess
import select
import sys
from ansible.module_utils.six import PY2, PY3
from ansible.module_utils._text import to_bytes
@ -30,7 +30,7 @@ from ansible.module_utils._text import to_bytes
def run_cmd(cmd, live=False, readsize=10):
#readsize = 10
# readsize = 10
# On python2, shlex needs byte strings
if PY2:

@ -22,26 +22,26 @@ import sys
from ansible import constants as C
ANSIBLE_COLOR=True
ANSIBLE_COLOR = True
if C.ANSIBLE_NOCOLOR:
ANSIBLE_COLOR=False
ANSIBLE_COLOR = False
elif not hasattr(sys.stdout, 'isatty') or not sys.stdout.isatty():
ANSIBLE_COLOR=False
ANSIBLE_COLOR = False
else:
try:
import curses
curses.setupterm()
if curses.tigetnum('colors') < 0:
ANSIBLE_COLOR=False
ANSIBLE_COLOR = False
except ImportError:
# curses library was not found
pass
except curses.error:
# curses returns an error (e.g. could not find terminal)
ANSIBLE_COLOR=False
ANSIBLE_COLOR = False
if C.ANSIBLE_FORCE_COLOR:
ANSIBLE_COLOR=True
ANSIBLE_COLOR = True
# --- begin "pretty"
#
@ -55,18 +55,19 @@ if C.ANSIBLE_FORCE_COLOR:
# http://nezzen.net/2008/06/23/colored-text-in-python-using-ansi-escape-sequences/
codeCodes = {
'black': u'0;30', 'bright gray': u'0;37',
'blue': u'0;34', 'white': u'1;37',
'green': u'0;32', 'bright blue': u'1;34',
'cyan': u'0;36', 'bright green': u'1;32',
'red': u'0;31', 'bright cyan': u'1;36',
'purple': u'0;35', 'bright red': u'1;31',
'yellow': u'0;33', 'bright purple': u'1;35',
'dark gray': u'1;30', 'bright yellow': u'1;33',
'magenta': u'0;35', 'bright magenta': u'1;35',
'normal': u'0' ,
'black': u'0;30', 'bright gray': u'0;37',
'blue': u'0;34', 'white': u'1;37',
'green': u'0;32', 'bright blue': u'1;34',
'cyan': u'0;36', 'bright green': u'1;32',
'red': u'0;31', 'bright cyan': u'1;36',
'purple': u'0;35', 'bright red': u'1;31',
'yellow': u'0;33', 'bright purple': u'1;35',
'dark gray': u'1;30', 'bright yellow': u'1;33',
'magenta': u'0;35', 'bright magenta': u'1;35',
'normal': u'0',
}
def parsecolor(color):
"""SGR parameter string for the specified color name."""
matches = re.match(r"color(?P<color>[0-9]+)"
@ -77,12 +78,13 @@ def parsecolor(color):
if matches.group('color'):
return u'38;5;%d' % int(matches.group('color'))
if matches.group('rgb'):
return u'38;5;%d' % (16 + 36 * int(matches.group('red'))
+ 6 * int(matches.group('green'))
+ int(matches.group('blue')))
return u'38;5;%d' % (16 + 36 * int(matches.group('red')) +
6 * int(matches.group('green')) +
int(matches.group('blue')))
if matches.group('gray'):
return u'38;5;%d' % (232 + int(matches.group('gray')))
def stringc(text, color):
"""String in color."""
@ -92,7 +94,6 @@ def stringc(text, color):
else:
return text
# --- end "pretty"
def colorize(lead, num, color):
""" Print 'lead' = 'num' in 'color' """
@ -101,6 +102,7 @@ def colorize(lead, num, color):
s = stringc(s, color)
return s
def hostcolor(host, stats, color=True):
if ANSIBLE_COLOR and color:
if stats['failures'] != 0 or stats['unreachable'] != 0:
@ -110,4 +112,3 @@ def hostcolor(host, stats, color=True):
else:
return u"%-37s" % stringc(host, C.COLOR_OK)
return u"%-26s" % host

@ -18,24 +18,25 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import errno
import fcntl
import textwrap
import getpass
import locale
import logging
import os
import random
import subprocess
import sys
import textwrap
import time
import locale
import logging
import getpass
import errno
from struct import unpack, pack
from termios import TIOCGWINSZ
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.utils.color import stringc
from ansible.module_utils._text import to_bytes, to_text
from ansible.utils.color import stringc
try:
@ -47,7 +48,7 @@ except NameError:
logger = None
#TODO: make this a logging callback instead
# TODO: make this a logging callback instead
if C.DEFAULT_LOG_PATH:
path = C.DEFAULT_LOG_PATH
if (os.path.exists(path) and os.access(path, os.W_OK)) or os.access(os.path.dirname(path), os.W_OK):
@ -58,11 +59,14 @@ if C.DEFAULT_LOG_PATH:
else:
print("[WARNING]: log file at %s is not writeable and we cannot create it, aborting\n" % path, file=sys.stderr)
b_COW_PATHS = (b"/usr/bin/cowsay",
b"/usr/games/cowsay",
b"/usr/local/bin/cowsay", # BSD path for cowsay
b"/opt/local/bin/cowsay", # MacPorts path for cowsay
)
b_COW_PATHS = (
b"/usr/bin/cowsay",
b"/usr/games/cowsay",
b"/usr/local/bin/cowsay", # BSD path for cowsay
b"/opt/local/bin/cowsay", # MacPorts path for cowsay
)
class Display:
def __init__(self, verbosity=0):
@ -72,8 +76,8 @@ class Display:
# list of all deprecation messages to prevent duplicate display
self._deprecations = {}
self._warns = {}
self._errors = {}
self._warns = {}
self._errors = {}
self.b_cowsay = None
self.noncow = C.ANSIBLE_COW_SELECTION
@ -84,7 +88,7 @@ class Display:
try:
cmd = subprocess.Popen([self.b_cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = cmd.communicate()
self.cows_available = set([ to_text(c) for c in out.split() ])
self.cows_available = set([to_text(c) for c in out.split()])
if C.ANSIBLE_COW_WHITELIST:
self.cows_available = set(C.ANSIBLE_COW_WHITELIST).intersection(self.cows_available)
except:

@ -17,14 +17,17 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import multiprocessing
import os
import stat
import tempfile
import multiprocessing
import time
import warnings
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text, to_bytes
PASSLIB_AVAILABLE = False
try:
import passlib.hash
@ -38,7 +41,7 @@ except ImportError:
from ansible.utils.display import Display
display = Display()
KEYCZAR_AVAILABLE=False
KEYCZAR_AVAILABLE = False
try:
try:
# some versions of pycrypto may not have this?
@ -53,22 +56,18 @@ try:
from keyczar.keys import AesKey
except PowmInsecureWarning:
display.system_warning(
"The version of gmp you have installed has a known issue regarding " + \
"timing vulnerabilities when used with pycrypto. " + \
"The version of gmp you have installed has a known issue regarding "
"timing vulnerabilities when used with pycrypto. "
"If possible, you should update it (i.e. yum update gmp)."
)
warnings.resetwarnings()
warnings.simplefilter("ignore")
import keyczar.errors as key_errors
from keyczar.keys import AesKey
KEYCZAR_AVAILABLE=True
KEYCZAR_AVAILABLE = True
except ImportError:
pass
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text, to_bytes
__all__ = ['do_encrypt']
_LOCK = multiprocessing.Lock()
@ -100,6 +99,7 @@ def do_encrypt(result, encrypt, salt_size=None, salt=None):
# impact calling code.
return to_text(result, errors='strict')
def key_for_hostname(hostname):
# fireball mode is an implementation of ansible firing up zeromq via SSH
# to use no persistent daemons or key management
@ -129,11 +129,11 @@ def key_for_hostname(hostname):
key_path = os.path.join(key_path, hostname)
# use new AES keys every 2 hours, which means fireball must not allow running for longer either
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60*60*2):
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2):
# avoid race with multiple forks trying to create key
# but limit when locking is needed to creation only
with(_LOCK):
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60*60*2):
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60 * 60 * 2):
key = AesKey.Generate()
# use temp file to ensure file only appears once it has
# desired contents and permissions
@ -152,12 +152,13 @@ def key_for_hostname(hostname):
fh.close()
return key
def keyczar_encrypt(key, msg):
return key.Encrypt(msg.encode('utf-8'))
def keyczar_decrypt(key, msg):
try:
return key.Decrypt(msg)
except key_errors.InvalidSignatureError:
raise AnsibleError("decryption failed")

@ -28,8 +28,7 @@ def pct_to_int(value, num_items, min_value=1):
otherwise converts the given value to an integer.
'''
if isinstance(value, string_types) and value.endswith('%'):
value_pct = int(value.replace("%",""))
return int((value_pct/100.0) * num_items) or min_value
value_pct = int(value.replace("%", ""))
return int((value_pct / 100.0) * num_items) or min_value
else:
return int(value)

@ -36,6 +36,6 @@ def listify_lookup_plugin_terms(terms, templar, loader, fail_on_undefined=True,
terms = templar.template(terms, fail_on_undefined=fail_on_undefined)
if isinstance(terms, string_types) or not isinstance(terms, Iterable):
terms = [ terms ]
terms = [terms]
return terms

@ -55,7 +55,7 @@ notes:
"""
# Documentation fragment for SolidFire
# Documentation fragment for SolidFire
SOLIDFIRE = """
options:
hostname:
@ -79,8 +79,7 @@ notes:
"""
# Documentation fragment for E-Series
# Documentation fragment for E-Series
ESERIES = """
options:
api_username:

@ -18,6 +18,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from errno import EEXIST
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_bytes, to_native, to_text
@ -49,6 +50,7 @@ def unfrackpath(path, follow=True):
return to_text(final_path, errors='surrogate_or_strict')
def makedirs_safe(path, mode=None):
'''Safe way to create dirs in muliprocess/thread environments.
@ -70,13 +72,14 @@ def makedirs_safe(path, mode=None):
if e.errno != EEXIST:
raise AnsibleError("Unable to create local directories(%s): %s" % (to_native(rpath), to_native(e)))
def basedir(source):
""" returns directory for inventory or playbook """
dname = None
if os.path.isdir(source):
dname = source
elif source in [None, '', '.']:
elif source in [None, '', '.']:
dname = os.getcwd()
elif os.path.isfile(source):
dname = os.path.dirname(source)
@ -86,4 +89,3 @@ def basedir(source):
dname = os.path.abspath(dname)
return dname

@ -35,7 +35,7 @@ def check_for_controlpersist(ssh_executable):
has_cp = True
try:
cmd = subprocess.Popen([ssh_executable,'-o','ControlPersist'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = subprocess.Popen([ssh_executable, '-o', 'ControlPersist'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = cmd.communicate()
if b"Bad configuration option" in err or b"Usage:" in err:
has_cp = False
@ -44,4 +44,3 @@ def check_for_controlpersist(ssh_executable):
_HAS_CONTROLPERSIST[ssh_executable] = has_cp
return has_cp

@ -23,20 +23,20 @@ import ast
import random
import uuid
from json import dumps
from collections import MutableMapping
from json import dumps
from ansible.module_utils.six import iteritems, string_types
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.parsing.splitter import parse_kv
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_native, to_text
from ansible.parsing.splitter import parse_kv
_MAXSIZE = 2**32
cur_id = 0
node_mac = ("%012x" % uuid.getnode())[:12]
_MAXSIZE = 2 ** 32
cur_id = 0
node_mac = ("%012x" % uuid.getnode())[:12]
random_int = ("%08x" % random.randint(0, _MAXSIZE))[:8]
@ -51,6 +51,7 @@ def get_unique_id():
("%012x" % cur_id)[:12],
])
def _validate_mutable_mappings(a, b):
"""
Internal convenience function to ensure arguments are MutableMappings

@ -806,13 +806,4 @@ lib/ansible/template/__init__.py
lib/ansible/template/safe_eval.py
lib/ansible/template/template.py
lib/ansible/template/vars.py
lib/ansible/utils/cmd_functions.py
lib/ansible/utils/color.py
lib/ansible/utils/display.py
lib/ansible/utils/encrypt.py
lib/ansible/utils/helpers.py
lib/ansible/utils/listify.py
lib/ansible/utils/path.py
lib/ansible/utils/ssh_functions.py
lib/ansible/utils/vars.py
lib/ansible/vars/manager.py

Loading…
Cancel
Save