ansible_mitogen: Handle AnsibleUnsafeText et al in Ansible >= 7
Follwing fixes in Ansible 7-9 for CVE-2023-5764 cating `AnsibleUnsafeBytes` & `AnsibleUnsafeText` to `bytes()` or `str()` requires special handling. The handling is Ansible specific, so it shouldn't go in the mitogen package but rather the ansible_mitogen package. `ansible_mitogen.utils.unsafe.cast()` is most like `mitogen.utils.cast()`. During development it began as `ansible_mitogen.utils.unsafe.unwrap_var()`, closer to an inverse of `ansible.utils.unsafe_procy.wrap_var()`. Future enhancements may move in this direction. refs #977, refs #1046 See also - https://github.com/advisories/GHSA-7j69-qfc3-2fq9 - https://github.com/ansible/ansible/pull/82293 - https://github.com/mitogen-hq/mitogen/wiki/AnsibleUnsafe-notespull/1057/head
parent
8059be7160
commit
d70ec4e540
@ -0,0 +1,79 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
import ansible
|
||||
import ansible.utils.unsafe_proxy
|
||||
|
||||
import ansible_mitogen.utils
|
||||
|
||||
import mitogen
|
||||
import mitogen.core
|
||||
import mitogen.utils
|
||||
|
||||
__all__ = [
|
||||
'cast',
|
||||
]
|
||||
|
||||
def _cast_to_dict(obj): return {cast(k): cast(v) for k, v in obj.items()}
|
||||
def _cast_to_list(obj): return [cast(v) for v in obj]
|
||||
def _cast_unsafe(obj): return obj._strip_unsafe()
|
||||
def _passthrough(obj): return obj
|
||||
|
||||
|
||||
# A dispatch table to cast objects based on their exact type.
|
||||
# This is an optimisation, reliable fallbacks are required (e.g. isinstance())
|
||||
_CAST_DISPATCH = {
|
||||
bytes: bytes,
|
||||
dict: _cast_to_dict,
|
||||
list: _cast_to_list,
|
||||
tuple: _cast_to_list,
|
||||
mitogen.core.UnicodeType: mitogen.core.UnicodeType,
|
||||
}
|
||||
_CAST_DISPATCH.update({t: _passthrough for t in mitogen.utils.PASSTHROUGH})
|
||||
|
||||
if hasattr(ansible.utils.unsafe_proxy.AnsibleUnsafeText, '_strip_unsafe'):
|
||||
_CAST_DISPATCH.update({
|
||||
ansible.utils.unsafe_proxy.AnsibleUnsafeBytes: _cast_unsafe,
|
||||
ansible.utils.unsafe_proxy.AnsibleUnsafeText: _cast_unsafe,
|
||||
ansible.utils.unsafe_proxy.NativeJinjaUnsafeText: _cast_unsafe,
|
||||
})
|
||||
elif ansible_mitogen.utils.ansible_version[:2] <= (2, 16):
|
||||
_CAST_DISPATCH.update({
|
||||
ansible.utils.unsafe_proxy.AnsibleUnsafeBytes: bytes,
|
||||
ansible.utils.unsafe_proxy.AnsibleUnsafeText: mitogen.core.UnicodeType,
|
||||
})
|
||||
else:
|
||||
mitogen_ver = '.'.join(str(v) for v in mitogen.__version__)
|
||||
raise ImportError("Mitogen %s can't unwrap Ansible %s AnsibleUnsafe objects"
|
||||
% (mitogen_ver, ansible.__version__))
|
||||
|
||||
|
||||
def cast(obj):
|
||||
"""
|
||||
Return obj (or a copy) with subtypes of builtins cast to their supertype.
|
||||
|
||||
This is an enhanced version of :func:`mitogen.utils.cast`. In addition it
|
||||
handles ``ansible.utils.unsafe_proxy.AnsibleUnsafeText`` and variants.
|
||||
|
||||
There are types handled by :func:`ansible.utils.unsafe_proxy.wrap_var()`
|
||||
that this function currently does not handle (e.g. `set()`), or preserve
|
||||
preserve (e.g. `tuple()`). Future enhancements may change this.
|
||||
|
||||
:param obj:
|
||||
Object to undecorate.
|
||||
:returns:
|
||||
Undecorated object.
|
||||
"""
|
||||
# Fast path: obj is a known type, dispatch directly
|
||||
try:
|
||||
unwrapper = _CAST_DISPATCH[type(obj)]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
return unwrapper(obj)
|
||||
|
||||
# Slow path: obj is some unknown subclass
|
||||
if isinstance(obj, dict): return _cast_to_dict(obj)
|
||||
if isinstance(obj, (list, tuple)): return _cast_to_list(obj)
|
||||
|
||||
return mitogen.utils.cast(obj)
|
@ -0,0 +1,92 @@
|
||||
import unittest
|
||||
|
||||
from ansible.utils.unsafe_proxy import AnsibleUnsafeBytes
|
||||
from ansible.utils.unsafe_proxy import AnsibleUnsafeText
|
||||
from ansible.utils.unsafe_proxy import wrap_var
|
||||
|
||||
import ansible_mitogen.utils.unsafe
|
||||
|
||||
import mitogen.core
|
||||
|
||||
|
||||
class Bytes(bytes): pass
|
||||
class Dict(dict): pass
|
||||
class List(list): pass
|
||||
class Set(set): pass
|
||||
class Text(mitogen.core.UnicodeType): pass
|
||||
class Tuple(tuple): pass
|
||||
|
||||
|
||||
class CastTest(unittest.TestCase):
|
||||
def assertIsType(self, obj, cls, msg=None):
|
||||
self.assertIs(type(obj), cls, msg)
|
||||
|
||||
def assertUnchanged(self, obj):
|
||||
self.assertIs(ansible_mitogen.utils.unsafe.cast(obj), obj)
|
||||
|
||||
def assertCasts(self, obj, expected):
|
||||
cast = ansible_mitogen.utils.unsafe.cast
|
||||
self.assertEqual(cast(obj), expected)
|
||||
self.assertIsType(cast(obj), type(expected))
|
||||
|
||||
def test_ansible_unsafe(self):
|
||||
self.assertCasts(AnsibleUnsafeBytes(b'abc'), b'abc')
|
||||
self.assertCasts(AnsibleUnsafeText(u'abc'), u'abc')
|
||||
|
||||
def test_passthrough(self):
|
||||
self.assertUnchanged(0)
|
||||
self.assertUnchanged(0.0)
|
||||
self.assertUnchanged(False)
|
||||
self.assertUnchanged(True)
|
||||
self.assertUnchanged(None)
|
||||
self.assertUnchanged(b'')
|
||||
self.assertUnchanged(u'')
|
||||
|
||||
def test_builtins_roundtrip(self):
|
||||
self.assertCasts(wrap_var(b''), b'')
|
||||
self.assertCasts(wrap_var({}), {})
|
||||
self.assertCasts(wrap_var([]), [])
|
||||
self.assertCasts(wrap_var(u''), u'')
|
||||
self.assertCasts(wrap_var(()), [])
|
||||
|
||||
def test_subtypes_roundtrip(self):
|
||||
self.assertCasts(wrap_var(Bytes()), b'')
|
||||
self.assertCasts(wrap_var(Dict()), {})
|
||||
self.assertCasts(wrap_var(List()), [])
|
||||
self.assertCasts(wrap_var(Text()), u'')
|
||||
self.assertCasts(wrap_var(Tuple()), [])
|
||||
|
||||
def test_subtype_nested_dict(self):
|
||||
obj = Dict(foo=Dict(bar=u'abc'))
|
||||
wrapped = wrap_var(obj)
|
||||
unwrapped = ansible_mitogen.utils.unsafe.cast(wrapped)
|
||||
self.assertEqual(unwrapped, {'foo': {'bar': u'abc'}})
|
||||
self.assertIsType(unwrapped, dict)
|
||||
self.assertIsType(unwrapped['foo'], dict)
|
||||
self.assertIsType(unwrapped['foo']['bar'], mitogen.core.UnicodeType)
|
||||
|
||||
def test_subtype_roundtrip_list(self):
|
||||
# wrap_var() preserves sequence types, cast() does not (for now)
|
||||
obj = List([List([u'abc'])])
|
||||
wrapped = wrap_var(obj)
|
||||
unwrapped = ansible_mitogen.utils.unsafe.cast(wrapped)
|
||||
self.assertEqual(unwrapped, [[u'abc']])
|
||||
self.assertIsType(unwrapped, list)
|
||||
self.assertIsType(unwrapped[0], list)
|
||||
self.assertIsType(unwrapped[0][0], mitogen.core.UnicodeType)
|
||||
|
||||
def test_subtype_roundtrip_tuple(self):
|
||||
# wrap_var() preserves sequence types, cast() does not (for now)
|
||||
obj = Tuple([Tuple([u'abc'])])
|
||||
wrapped = wrap_var(obj)
|
||||
unwrapped = ansible_mitogen.utils.unsafe.cast(wrapped)
|
||||
self.assertEqual(unwrapped, [[u'abc']])
|
||||
self.assertIsType(unwrapped, list)
|
||||
self.assertIsType(unwrapped[0], list)
|
||||
self.assertIsType(unwrapped[0][0], mitogen.core.UnicodeType)
|
||||
|
||||
def test_unknown_types_raise(self):
|
||||
cast = ansible_mitogen.utils.unsafe.cast
|
||||
self.assertRaises(TypeError, cast, set())
|
||||
self.assertRaises(TypeError, cast, Set())
|
||||
self.assertRaises(TypeError, cast, 4j)
|
Loading…
Reference in New Issue