|
|
|
import mitogen.core
|
|
|
|
import mitogen.master
|
|
|
|
import mitogen.utils
|
|
|
|
from mitogen.core import b
|
|
|
|
|
|
|
|
import testlib
|
|
|
|
|
|
|
|
|
|
|
|
def func0(router):
|
|
|
|
return router
|
|
|
|
|
|
|
|
|
|
|
|
@mitogen.utils.with_router
|
|
|
|
def func(router):
|
|
|
|
"Docstring of func"
|
|
|
|
return router
|
|
|
|
|
|
|
|
|
|
|
|
class RunWithRouterTest(testlib.TestCase):
|
|
|
|
# test_shutdown_on_exception
|
|
|
|
# test_shutdown_on_success
|
|
|
|
|
|
|
|
def test_run_with_broker(self):
|
|
|
|
router = mitogen.utils.run_with_router(func0)
|
|
|
|
self.assertIsInstance(router, mitogen.master.Router)
|
|
|
|
self.assertFalse(testlib.threading__thread_is_alive(router.broker._thread))
|
|
|
|
|
|
|
|
|
|
|
|
class WithRouterTest(testlib.TestCase):
|
|
|
|
def test_with_broker(self):
|
|
|
|
router = func()
|
|
|
|
self.assertIsInstance(router, mitogen.master.Router)
|
|
|
|
self.assertFalse(testlib.threading__thread_is_alive(router.broker._thread))
|
|
|
|
|
|
|
|
def test_with_broker_preserves_attributes(self):
|
|
|
|
self.assertEqual(func.__doc__, 'Docstring of func')
|
|
|
|
self.assertEqual(func.__name__, 'func')
|
|
|
|
|
|
|
|
|
|
|
|
class Dict(dict): pass
|
|
|
|
class List(list): pass
|
|
|
|
class Tuple(tuple): pass
|
|
|
|
class Unicode(mitogen.core.UnicodeType): pass
|
|
|
|
class Bytes(mitogen.core.BytesType): pass
|
|
|
|
|
|
|
|
|
|
|
|
class StubbornBytes(mitogen.core.BytesType):
|
|
|
|
"""
|
|
|
|
A binary string type that persists through `bytes(...)`.
|
|
|
|
|
|
|
|
Stand-in for `AnsibleUnsafeBytes()` in Ansible 7-9 (core 2.14-2.16), after
|
|
|
|
fixes/mitigations for CVE-2023-5764.
|
|
|
|
"""
|
|
|
|
if mitogen.core.PY3:
|
|
|
|
def __bytes__(self): return self
|
|
|
|
def __str__(self): return self.decode()
|
|
|
|
else:
|
|
|
|
def __str__(self): return self
|
|
|
|
def __unicode__(self): return self.decode()
|
|
|
|
|
|
|
|
def decode(self, encoding='utf-8', errors='strict'):
|
|
|
|
s = super(StubbornBytes).encode(encoding=encoding, errors=errors)
|
|
|
|
return StubbornText(s)
|
|
|
|
|
|
|
|
|
|
|
|
class StubbornText(mitogen.core.UnicodeType):
|
|
|
|
"""
|
|
|
|
A text string type that persists through `unicode(...)` or `str(...)`.
|
|
|
|
|
|
|
|
Stand-in for `AnsibleUnsafeText()` in Ansible 7-9 (core 2.14-2.16), after
|
|
|
|
following fixes/mitigations for CVE-2023-5764.
|
|
|
|
"""
|
|
|
|
if mitogen.core.PY3:
|
|
|
|
def __bytes__(self): return self.encode()
|
|
|
|
def __str__(self): return self
|
|
|
|
else:
|
|
|
|
def __str__(self): return self.encode()
|
|
|
|
def __unicode__(self): return self
|
|
|
|
|
|
|
|
def encode(self, encoding='utf-8', errors='strict'):
|
|
|
|
s = super(StubbornText).encode(encoding=encoding, errors=errors)
|
|
|
|
return StubbornBytes(s)
|
|
|
|
|
|
|
|
|
|
|
|
class CastTest(testlib.TestCase):
|
|
|
|
def test_dict(self):
|
|
|
|
self.assertEqual(type(mitogen.utils.cast({})), dict)
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(Dict())), dict)
|
|
|
|
|
|
|
|
def test_nested_dict(self):
|
|
|
|
specimen = mitogen.utils.cast(Dict({'k': Dict({'k2': 'v2'})}))
|
|
|
|
self.assertEqual(type(specimen), dict)
|
|
|
|
self.assertEqual(type(specimen['k']), dict)
|
|
|
|
|
|
|
|
def test_list(self):
|
|
|
|
self.assertEqual(type(mitogen.utils.cast([])), list)
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(List())), list)
|
|
|
|
|
|
|
|
def test_nested_list(self):
|
|
|
|
specimen = mitogen.utils.cast(List((0, 1, List((None,)))))
|
|
|
|
self.assertEqual(type(specimen), list)
|
|
|
|
self.assertEqual(type(specimen[2]), list)
|
|
|
|
|
|
|
|
def test_tuple(self):
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(())), list)
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(Tuple())), list)
|
|
|
|
|
|
|
|
def test_nested_tuple(self):
|
|
|
|
specimen = mitogen.utils.cast(Tuple((0, 1, Tuple((None,)))))
|
|
|
|
self.assertEqual(type(specimen), list)
|
|
|
|
self.assertEqual(type(specimen[2]), list)
|
|
|
|
|
|
|
|
def assertUnchanged(self, v):
|
|
|
|
self.assertIs(mitogen.utils.cast(v), v)
|
|
|
|
|
|
|
|
def test_passthrough(self):
|
|
|
|
self.assertUnchanged(0)
|
|
|
|
self.assertUnchanged(0.0)
|
|
|
|
self.assertUnchanged(float('inf'))
|
|
|
|
self.assertUnchanged(True)
|
|
|
|
self.assertUnchanged(False)
|
|
|
|
self.assertUnchanged(None)
|
|
|
|
|
|
|
|
def test_unicode(self):
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(u'')), mitogen.core.UnicodeType)
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(Unicode())), mitogen.core.UnicodeType)
|
|
|
|
|
|
|
|
def test_bytes(self):
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(b(''))), mitogen.core.BytesType)
|
|
|
|
self.assertEqual(type(mitogen.utils.cast(Bytes())), mitogen.core.BytesType)
|
|
|
|
|
|
|
|
def test_stubborn_types_raise(self):
|
|
|
|
stubborn_bytes = StubbornBytes(b('abc'))
|
|
|
|
self.assertIs(stubborn_bytes, mitogen.core.BytesType(stubborn_bytes))
|
|
|
|
self.assertRaises(TypeError, mitogen.utils.cast, stubborn_bytes)
|
|
|
|
|
|
|
|
stubborn_text = StubbornText(u'abc')
|
|
|
|
self.assertIs(stubborn_text, mitogen.core.UnicodeType(stubborn_text))
|
|
|
|
self.assertRaises(TypeError, mitogen.utils.cast, stubborn_text)
|
|
|
|
|
|
|
|
def test_unknown(self):
|
|
|
|
self.assertRaises(TypeError, mitogen.utils.cast, set())
|
|
|
|
self.assertRaises(TypeError, mitogen.utils.cast, 4j)
|