You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/tests/file_service_test.py

149 lines
4.3 KiB
Python

import sys
import mitogen.service
import testlib
class FetchTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.service.FileService
def replyable_msg(self, **kwargs):
recv = mitogen.core.Receiver(self.router, persist=False)
msg = mitogen.core.Message(
src_id=mitogen.context_id,
reply_to=recv.handle,
**kwargs
)
msg.router = self.router
return recv, msg
def test_unauthorized(self):
l1 = self.router.local()
service = self.klass(self.router)
pool = mitogen.service.Pool(
router=self.router,
services=[service],
size=1,
)
try:
e = self.assertRaises(mitogen.core.CallError,
lambda: l1.call(
mitogen.service.FileService.get,
context=self.router.myself(),
path='/etc/shadow',
out_fp=None,
)
)
finally:
pool.stop()
expect = service.unregistered_msg % ('/etc/shadow',)
self.assertTrue(expect in e.args[0])
if sys.platform == 'darwin':
ROOT_GROUP = 'wheel'
else:
ROOT_GROUP = 'root'
def _validate_response(self, resp):
self.assertTrue(isinstance(resp, dict))
self.assertEquals('root', resp['owner'])
self.assertEquals(self.ROOT_GROUP, resp['group'])
self.assertTrue(isinstance(resp['mode'], int))
self.assertTrue(isinstance(resp['mtime'], float))
self.assertTrue(isinstance(resp['atime'], float))
self.assertTrue(isinstance(resp['size'], int))
def test_path_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register('/etc/passwd')
recv, msg = self.replyable_msg()
service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=msg,
)
self._validate_response(recv.get().unpickle())
def test_root_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/')
recv, msg = self.replyable_msg()
service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=msg,
)
self._validate_response(recv.get().unpickle())
def test_prefix_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/etc')
recv, msg = self.replyable_msg()
service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=msg,
)
self._validate_response(recv.get().unpickle())
def test_prefix_authorized_abspath_bad(self):
l1 = self.router.local()
service = self.klass(self.router)
service.register_prefix('/etc')
pool = mitogen.service.Pool(
router=self.router,
services=[service],
size=1,
)
path = '/etc/foo/bar/../../../passwd'
try:
e = self.assertRaises(mitogen.core.CallError,
lambda: l1.call(
mitogen.service.FileService.get,
context=self.router.myself(),
path=path,
out_fp=None,
)
)
finally:
pool.stop()
expect = service.unregistered_msg % (path,)
self.assertTrue(expect in e.args[0])
def test_prefix_authorized_abspath_good(self):
l1 = self.router.local()
service = self.klass(self.router)
service.register_prefix('/etc')
path = '/etc/../shadow'
pool = mitogen.service.Pool(
router=self.router,
services=[service],
size=1,
)
try:
e = self.assertRaises(mitogen.core.CallError,
lambda: l1.call(
mitogen.service.FileService.get,
context=self.router.myself(),
path=path,
out_fp=None
)
)
finally:
pool.stop()
expect = service.unregistered_msg % (path,)
self.assertTrue(expect in e.args[0])