Isolate globals in import sanity test.

pull/59742/head
Matt Clay 5 years ago
parent 0b8354751b
commit ecddbdf0cb

@ -3,231 +3,230 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import contextlib
import os
import re
import sys
import traceback
import warnings
try:
import importlib.util
imp = None # pylint: disable=invalid-name
except ImportError:
importlib = None # pylint: disable=invalid-name
import imp
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import ansible.module_utils.basic
import ansible.module_utils.common.removed
try:
from ansible.utils.collection_loader import AnsibleCollectionLoader
except ImportError:
AnsibleCollectionLoader = None
class ImporterAnsibleModuleException(Exception):
"""Exception thrown during initialization of ImporterAnsibleModule."""
class ImporterAnsibleModule:
"""Replacement for AnsibleModule to support import testing."""
def __init__(self, *args, **kwargs):
raise ImporterAnsibleModuleException()
# stop Ansible module execution during AnsibleModule instantiation
ansible.module_utils.basic.AnsibleModule = ImporterAnsibleModule
# no-op for _load_params since it may be called before instantiating AnsibleModule
ansible.module_utils.basic._load_params = lambda *args, **kwargs: {} # pylint: disable=protected-access
# no-op for removed_module since it is called in place of AnsibleModule instantiation
ansible.module_utils.common.removed.removed_module = lambda *args, **kwargs: None
def main(): def main():
"""Main program function.""" """
base_dir = os.getcwd() Main program function used to isolate globals from imported code.
messages = set() Changes to globals in imported modules on Python 2.7 will overwrite our own globals.
"""
if AnsibleCollectionLoader: import contextlib
# allow importing code from collections import os
sys.meta_path.insert(0, AnsibleCollectionLoader()) import re
import sys
for path in sys.argv[1:] or sys.stdin.read().splitlines(): import traceback
test_python_module(path, base_dir, messages, False) import warnings
test_python_module(path, base_dir, messages, True)
if messages:
exit(10)
def test_python_module(path, base_dir, messages, ansible_module):
if ansible_module:
# importing modules with __main__ under Python 2.6 exits with status code 1
if sys.version_info < (2, 7):
return
# only run __main__ protected code for Ansible modules
if not path.startswith('lib/ansible/modules/'):
return
# async_wrapper is not an Ansible module try:
if path == 'lib/ansible/modules/utilities/logic/async_wrapper.py': import importlib.util
return imp = None # pylint: disable=invalid-name
except ImportError:
importlib = None # pylint: disable=invalid-name
import imp
# run code protected by __name__ conditional try:
name = '__main__' from StringIO import StringIO
# show the Ansible module responsible for the exception, even if it was thrown in module_utils except ImportError:
filter_dir = os.path.join(base_dir, 'lib/ansible/modules') from io import StringIO
else:
# do not run code protected by __name__ conditional
name = 'module_import_test'
# show the Ansible file responsible for the exception, even if it was thrown in 3rd party code
filter_dir = base_dir
capture = Capture() import ansible.module_utils.basic
import ansible.module_utils.common.removed
try: try:
if imp: from ansible.utils.collection_loader import AnsibleCollectionLoader
with open(path, 'r') as module_fd: except ImportError:
with capture_output(capture): AnsibleCollectionLoader = None
imp.load_module(name, module_fd, os.path.abspath(path), ('.py', 'r', imp.PY_SOURCE))
else: class ImporterAnsibleModuleException(Exception):
spec = importlib.util.spec_from_file_location(name, os.path.abspath(path)) """Exception thrown during initialization of ImporterAnsibleModule."""
module = importlib.util.module_from_spec(spec)
class ImporterAnsibleModule:
with capture_output(capture): """Replacement for AnsibleModule to support import testing."""
spec.loader.exec_module(module) def __init__(self, *args, **kwargs):
raise ImporterAnsibleModuleException()
capture_report(path, capture, messages)
except ImporterAnsibleModuleException: # stop Ansible module execution during AnsibleModule instantiation
# module instantiated AnsibleModule without raising an exception ansible.module_utils.basic.AnsibleModule = ImporterAnsibleModule
pass # no-op for _load_params since it may be called before instantiating AnsibleModule
# We truly want to catch anything the plugin might do here, including call sys.exit() so we ansible.module_utils.basic._load_params = lambda *args, **kwargs: {} # pylint: disable=protected-access
# catch BaseException # no-op for removed_module since it is called in place of AnsibleModule instantiation
except BaseException as ex: # pylint: disable=locally-disabled, broad-except ansible.module_utils.common.removed.removed_module = lambda *args, **kwargs: None
capture_report(path, capture, messages)
def run():
exc_type, _exc, exc_tb = sys.exc_info() """Main program function."""
message = str(ex) base_dir = os.getcwd()
results = list(reversed(traceback.extract_tb(exc_tb))) messages = set()
source = None
line = 0 if AnsibleCollectionLoader:
offset = 0 # allow importing code from collections
sys.meta_path.insert(0, AnsibleCollectionLoader())
if isinstance(ex, SyntaxError) and ex.filename.endswith(path): # pylint: disable=locally-disabled, no-member
# A SyntaxError in the source we're importing will have the correct path, line and offset. for path in sys.argv[1:] or sys.stdin.read().splitlines():
# However, the traceback will report the path to this importer.py script instead. test_python_module(path, base_dir, messages, False)
# We'll use the details from the SyntaxError in this case, as it's more accurate. test_python_module(path, base_dir, messages, True)
source = path
line = ex.lineno or 0 # pylint: disable=locally-disabled, no-member if messages:
offset = ex.offset or 0 # pylint: disable=locally-disabled, no-member exit(10)
message = str(ex)
def test_python_module(path, base_dir, messages, ansible_module):
# Hack to remove the filename and line number from the message, if present. if ansible_module:
message = message.replace(' (%s, line %d)' % (os.path.basename(path), line), '') # importing modules with __main__ under Python 2.6 exits with status code 1
if sys.version_info < (2, 7):
return
# only run __main__ protected code for Ansible modules
if not path.startswith('lib/ansible/modules/'):
return
# async_wrapper is not an Ansible module
if path == 'lib/ansible/modules/utilities/logic/async_wrapper.py':
return
# run code protected by __name__ conditional
name = '__main__'
# show the Ansible module responsible for the exception, even if it was thrown in module_utils
filter_dir = os.path.join(base_dir, 'lib/ansible/modules')
else: else:
for result in results: # do not run code protected by __name__ conditional
if result[0].startswith(filter_dir): name = 'module_import_test'
source = result[0][len(base_dir) + 1:].replace('test/sanity/import/', '') # show the Ansible file responsible for the exception, even if it was thrown in 3rd party code
line = result[1] or 0 filter_dir = base_dir
break
if not source:
# If none of our source files are found in the traceback, report the file we were testing.
# I haven't been able to come up with a test case that encounters this issue yet.
source = path
message += ' (in %s:%d)' % (results[-1][0], results[-1][1] or 0)
message = re.sub(r'\n *', ': ', message)
error = '%s:%d:%d: %s: %s' % (source, line, offset, exc_type.__name__, message)
report_message(error, messages)
class Capture:
"""Captured output and/or exception."""
def __init__(self):
self.stdout = StringIO()
self.stderr = StringIO()
self.warnings = []
capture = Capture()
def capture_report(path, capture, messages):
"""Report on captured output.
:type path: str
:type capture: Capture
:type messages: set[str]
"""
if capture.stdout.getvalue():
first = capture.stdout.getvalue().strip().splitlines()[0].strip()
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardOutputUsed', first)
report_message(message, messages)
if capture.stderr.getvalue():
first = capture.stderr.getvalue().strip().splitlines()[0].strip()
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardErrorUsed', first)
report_message(message, messages)
for warning in capture.warnings:
msg = re.sub(r'\s+', ' ', '%s' % warning.message).strip()
filepath = os.path.relpath(warning.filename)
lineno = warning.lineno
import_dir = 'test/runner/.tox/import/'
minimal_dir = 'test/runner/.tox/minimal-'
if filepath.startswith('../') or filepath.startswith(minimal_dir):
# The warning occurred outside our source tree.
# The best we can do is to report the file which was tested that triggered the warning.
# If the responsible import is in shared code this warning will be repeated for each file tested which imports the shared code.
msg += ' (in %s:%d)' % (warning.filename, warning.lineno)
filepath = path
lineno = 0
elif filepath.startswith(import_dir):
# Strip the import dir from warning paths in shared code.
# Needed when warnings occur in places like module_utils but are caught by the modules importing the module_utils.
filepath = os.path.relpath(filepath, import_dir)
message = '%s:%d:%d: %s: %s' % (filepath, lineno, 0, warning.category.__name__, msg)
report_message(message, messages)
def report_message(message, messages):
"""Report message if not already reported.
:type message: str
:type messages: set[str]
"""
if message not in messages:
messages.add(message)
print(message)
@contextlib.contextmanager
def capture_output(capture):
"""Capture sys.stdout and sys.stderr.
:type capture: Capture
"""
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = capture.stdout
sys.stderr = capture.stderr
with warnings.catch_warnings(record=True) as captured_warnings:
try: try:
yield if imp:
finally: with open(path, 'r') as module_fd:
capture.warnings = captured_warnings with capture_output(capture):
imp.load_module(name, module_fd, os.path.abspath(path), ('.py', 'r', imp.PY_SOURCE))
else:
spec = importlib.util.spec_from_file_location(name, os.path.abspath(path))
module = importlib.util.module_from_spec(spec)
sys.stdout = old_stdout with capture_output(capture):
sys.stderr = old_stderr spec.loader.exec_module(module)
capture_report(path, capture, messages)
except ImporterAnsibleModuleException:
# module instantiated AnsibleModule without raising an exception
pass
# We truly want to catch anything the plugin might do here, including call sys.exit() so we
# catch BaseException
except BaseException as ex: # pylint: disable=locally-disabled, broad-except
capture_report(path, capture, messages)
exc_type, _exc, exc_tb = sys.exc_info()
message = str(ex)
results = list(reversed(traceback.extract_tb(exc_tb)))
source = None
line = 0
offset = 0
if isinstance(ex, SyntaxError) and ex.filename.endswith(path): # pylint: disable=locally-disabled, no-member
# A SyntaxError in the source we're importing will have the correct path, line and offset.
# However, the traceback will report the path to this importer.py script instead.
# We'll use the details from the SyntaxError in this case, as it's more accurate.
source = path
line = ex.lineno or 0 # pylint: disable=locally-disabled, no-member
offset = ex.offset or 0 # pylint: disable=locally-disabled, no-member
message = str(ex)
# Hack to remove the filename and line number from the message, if present.
message = message.replace(' (%s, line %d)' % (os.path.basename(path), line), '')
else:
for result in results:
if result[0].startswith(filter_dir):
source = result[0][len(base_dir) + 1:].replace('test/sanity/import/', '')
line = result[1] or 0
break
if not source:
# If none of our source files are found in the traceback, report the file we were testing.
# I haven't been able to come up with a test case that encounters this issue yet.
source = path
message += ' (in %s:%d)' % (results[-1][0], results[-1][1] or 0)
message = re.sub(r'\n *', ': ', message)
error = '%s:%d:%d: %s: %s' % (source, line, offset, exc_type.__name__, message)
report_message(error, messages)
class Capture:
"""Captured output and/or exception."""
def __init__(self):
self.stdout = StringIO()
self.stderr = StringIO()
self.warnings = []
def capture_report(path, capture, messages):
"""Report on captured output.
:type path: str
:type capture: Capture
:type messages: set[str]
"""
if capture.stdout.getvalue():
first = capture.stdout.getvalue().strip().splitlines()[0].strip()
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardOutputUsed', first)
report_message(message, messages)
if capture.stderr.getvalue():
first = capture.stderr.getvalue().strip().splitlines()[0].strip()
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardErrorUsed', first)
report_message(message, messages)
for warning in capture.warnings:
msg = re.sub(r'\s+', ' ', '%s' % warning.message).strip()
filepath = os.path.relpath(warning.filename)
lineno = warning.lineno
import_dir = 'test/runner/.tox/import/'
minimal_dir = 'test/runner/.tox/minimal-'
if filepath.startswith('../') or filepath.startswith(minimal_dir):
# The warning occurred outside our source tree.
# The best we can do is to report the file which was tested that triggered the warning.
# If the responsible import is in shared code this warning will be repeated for each file tested which imports the shared code.
msg += ' (in %s:%d)' % (warning.filename, warning.lineno)
filepath = path
lineno = 0
elif filepath.startswith(import_dir):
# Strip the import dir from warning paths in shared code.
# Needed when warnings occur in places like module_utils but are caught by the modules importing the module_utils.
filepath = os.path.relpath(filepath, import_dir)
message = '%s:%d:%d: %s: %s' % (filepath, lineno, 0, warning.category.__name__, msg)
report_message(message, messages)
def report_message(message, messages):
"""Report message if not already reported.
:type message: str
:type messages: set[str]
"""
if message not in messages:
messages.add(message)
print(message)
@contextlib.contextmanager
def capture_output(capture):
"""Capture sys.stdout and sys.stderr.
:type capture: Capture
"""
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = capture.stdout
sys.stderr = capture.stderr
with warnings.catch_warnings(record=True) as captured_warnings:
try:
yield
finally:
capture.warnings = captured_warnings
sys.stdout = old_stdout
sys.stderr = old_stderr
run()
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save