ansible: try to create tempdir if missing.

Closes #358.
issue72
David Wilson 6 years ago
parent 62f7963da9
commit 9d070541d9

@ -214,30 +214,28 @@ def _on_broker_shutdown():
prune_tree(temp_dir) prune_tree(temp_dir)
def find_good_temp_dir(candidate_temp_dirs): def is_good_temp_dir(path):
""" """
Given a list of candidate temp directories extracted from ``ansible.cfg``, Return :data:`True` if `path` can be used as a temporary directory, logging
combine it with the Python-builtin list of candidate directories used by any failures that may cause it to be unsuitable. If the directory doesn't
:mod:`tempfile`, then iteratively try each until one is found that is both exist, we attempt to create it using :func:`os.makedirs`.
writeable and executable.
:param list candidate_temp_dirs:
List of candidate $variable-expanded and tilde-expanded directory paths
that may be usable as a temporary directory.
""" """
paths = [os.path.expandvars(os.path.expanduser(p)) if not os.path.exists(path):
for p in candidate_temp_dirs] try:
paths.extend(tempfile._candidate_tempdir_list()) os.makedirs(path, mode=int('0700', 8))
except OSError as e:
LOG.debug('temp dir %r unusable: did not exist and attempting '
'to create it failed: %s', path, e)
return False
for path in paths:
try: try:
tmp = tempfile.NamedTemporaryFile( tmp = tempfile.NamedTemporaryFile(
prefix='ansible_mitogen_find_good_temp_dir', prefix='ansible_mitogen_is_good_temp_dir',
dir=path, dir=path,
) )
except (OSError, IOError) as e: except (OSError, IOError) as e:
LOG.debug('temp dir %r unusable: %s', path, e) LOG.debug('temp dir %r unusable: %s', path, e)
continue return False
try: try:
try: try:
@ -245,7 +243,7 @@ def find_good_temp_dir(candidate_temp_dirs):
except OSError as e: except OSError as e:
LOG.debug('temp dir %r unusable: %s: chmod failed: %s', LOG.debug('temp dir %r unusable: %s: chmod failed: %s',
path, e) path, e)
continue return False
try: try:
# access(.., X_OK) is sufficient to detect noexec. # access(.., X_OK) is sufficient to detect noexec.
@ -253,12 +251,32 @@ def find_good_temp_dir(candidate_temp_dirs):
raise OSError('filesystem appears to be mounted noexec') raise OSError('filesystem appears to be mounted noexec')
except OSError as e: except OSError as e:
LOG.debug('temp dir %r unusable: %s: %s', path, e) LOG.debug('temp dir %r unusable: %s: %s', path, e)
continue return False
finally:
tmp.close()
return True
def find_good_temp_dir(candidate_temp_dirs):
"""
Given a list of candidate temp directories extracted from ``ansible.cfg``,
combine it with the Python-builtin list of candidate directories used by
:mod:`tempfile`, then iteratively try each until one is found that is both
writeable and executable.
:param list candidate_temp_dirs:
List of candidate $variable-expanded and tilde-expanded directory paths
that may be usable as a temporary directory.
"""
paths = [os.path.expandvars(os.path.expanduser(p))
for p in candidate_temp_dirs]
paths.extend(tempfile._candidate_tempdir_list())
for path in paths:
if is_good_temp_dir(path):
LOG.debug('Selected temp directory: %r (from %r)', path, paths) LOG.debug('Selected temp directory: %r (from %r)', path, paths)
return path return path
finally:
tmp.close()
raise IOError(MAKE_TEMP_FAILED_MSG % { raise IOError(MAKE_TEMP_FAILED_MSG % {
'paths': '\n '.join(paths), 'paths': '\n '.join(paths),

@ -1,20 +0,0 @@
import unittest2
import ansible_mitogen.helpers
import testlib
class ApplyModeSpecTest(unittest2.TestCase):
func = staticmethod(ansible_mitogen.helpers.apply_mode_spec)
def test_simple(self):
spec = 'u+rwx,go=x'
self.assertEquals(0711, self.func(spec, 0))
spec = 'g-rw'
self.assertEquals(0717, self.func(spec, 0777))
if __name__ == '__main__':
unittest2.main()

@ -0,0 +1,77 @@
from __future__ import absolute_import
import os.path
import subprocess
import tempfile
import unittest2
import mock
import ansible_mitogen.target
import testlib
LOGGER_NAME = ansible_mitogen.target.LOG.name
class NamedTemporaryDirectory(object):
def __enter__(self):
self.path = tempfile.mkdtemp()
return self.path
def __exit__(self, _1, _2, _3):
subprocess.check_call(['rm', '-rf', self.path])
class ApplyModeSpecTest(unittest2.TestCase):
func = staticmethod(ansible_mitogen.target.apply_mode_spec)
def test_simple(self):
spec = 'u+rwx,go=x'
self.assertEquals(0711, self.func(spec, 0))
spec = 'g-rw'
self.assertEquals(0717, self.func(spec, 0777))
class IsGoodTempDirTest(unittest2.TestCase):
func = staticmethod(ansible_mitogen.target.is_good_temp_dir)
def test_creates(self):
with NamedTemporaryDirectory() as temp_path:
bleh = os.path.join(temp_path, 'bleh')
self.assertFalse(os.path.exists(bleh))
self.assertTrue(self.func(bleh))
self.assertTrue(os.path.exists(bleh))
def test_file_exists(self):
with NamedTemporaryDirectory() as temp_path:
bleh = os.path.join(temp_path, 'bleh')
with open(bleh, 'w') as fp:
fp.write('derp')
self.assertTrue(os.path.isfile(bleh))
self.assertFalse(self.func(bleh))
self.assertEquals(open(bleh).read(), 'derp')
def test_unwriteable(self):
with NamedTemporaryDirectory() as temp_path:
os.chmod(temp_path, 0)
self.assertFalse(self.func(temp_path))
os.chmod(temp_path, int('0700', 8))
@mock.patch('os.chmod')
def test_weird_filesystem(self, os_chmod):
os_chmod.side_effect = OSError('nope')
with NamedTemporaryDirectory() as temp_path:
self.assertFalse(self.func(temp_path))
@mock.patch('os.access')
def test_noexec(self, os_access):
os_access.return_value = False
with NamedTemporaryDirectory() as temp_path:
self.assertFalse(self.func(temp_path))
if __name__ == '__main__':
unittest2.main()

@ -158,22 +158,48 @@ def sync_with_broker(broker, timeout=10.0):
sem.get(timeout=10.0) sem.get(timeout=10.0)
class CaptureStreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs):
super(CaptureStreamHandler, self).__init__(*args, **kwargs)
self.msgs = []
def emit(self, msg):
self.msgs.append(msg)
return super(CaptureStreamHandler, self).emit(msg)
class LogCapturer(object): class LogCapturer(object):
def __init__(self, name=None): def __init__(self, name=None):
self.sio = StringIO() self.sio = StringIO()
self.logger = logging.getLogger(name) self.logger = logging.getLogger(name)
self.handler = logging.StreamHandler(self.sio) self.handler = CaptureStreamHandler(self.sio)
self.old_propagate = self.logger.propagate self.old_propagate = self.logger.propagate
self.old_handlers = self.logger.handlers self.old_handlers = self.logger.handlers
self.old_level = self.logger.level
def start(self): def start(self):
self.logger.handlers = [self.handler] self.logger.handlers = [self.handler]
self.logger.propagate = False self.logger.propagate = False
self.logger.level = logging.DEBUG
def raw(self):
return self.sio.getvalue()
def msgs(self):
return self.handler.msgs
def __enter__(self):
self.start()
return self
def __exit__(self, _1, _2, _3):
self.stop()
def stop(self): def stop(self):
self.logger.level = self.old_level
self.logger.handlers = self.old_handlers self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate self.logger.propagate = self.old_propagate
return self.sio.getvalue() return self.raw()
class TestCase(unittest2.TestCase): class TestCase(unittest2.TestCase):

Loading…
Cancel
Save