import sys import mitogen.service import testlib class FetchTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.service.FileService def replyable_msg(self, **kwargs): recv = mitogen.core.Receiver(self.router, persist=False) msg = mitogen.core.Message( src_id=mitogen.context_id, reply_to=recv.handle, **kwargs ) msg.router = self.router return recv, msg def test_unauthorized(self): l1 = self.router.local() service = self.klass(self.router) pool = mitogen.service.Pool( router=self.router, services=[service], size=1, ) try: e = self.assertRaises(mitogen.core.CallError, lambda: l1.call( mitogen.service.FileService.get, context=self.router.myself(), path='/etc/shadow', out_fp=None, ) ) finally: pool.stop() expect = service.unregistered_msg % ('/etc/shadow',) self.assertIn(expect, e.args[0]) if sys.platform == 'darwin': ROOT_GROUP = 'wheel' else: ROOT_GROUP = 'root' def _validate_response(self, resp): self.assertIsInstance(resp, dict) self.assertEqual('root', resp['owner']) self.assertEqual(self.ROOT_GROUP, resp['group']) self.assertIsInstance(resp['mode'], int) self.assertIsInstance(resp['mtime'], float) self.assertIsInstance(resp['atime'], float) self.assertIsInstance(resp['size'], int) def test_path_authorized(self): recv = mitogen.core.Receiver(self.router) service = self.klass(self.router) service.register('/etc/passwd') recv, msg = self.replyable_msg() service.fetch( path='/etc/passwd', sender=recv.to_sender(), msg=msg, ) self._validate_response(recv.get().unpickle()) def test_root_authorized(self): recv = mitogen.core.Receiver(self.router) service = self.klass(self.router) service.register_prefix('/') recv, msg = self.replyable_msg() service.fetch( path='/etc/passwd', sender=recv.to_sender(), msg=msg, ) self._validate_response(recv.get().unpickle()) def test_prefix_authorized(self): recv = mitogen.core.Receiver(self.router) service = self.klass(self.router) service.register_prefix('/etc') recv, msg = self.replyable_msg() service.fetch( path='/etc/passwd', sender=recv.to_sender(), msg=msg, ) self._validate_response(recv.get().unpickle()) def test_prefix_authorized_abspath_bad(self): l1 = self.router.local() service = self.klass(self.router) service.register_prefix('/etc') pool = mitogen.service.Pool( router=self.router, services=[service], size=1, ) path = '/etc/foo/bar/../../../passwd' try: e = self.assertRaises(mitogen.core.CallError, lambda: l1.call( mitogen.service.FileService.get, context=self.router.myself(), path=path, out_fp=None, ) ) finally: pool.stop() expect = service.unregistered_msg % (path,) self.assertIn(expect, e.args[0]) def test_prefix_authorized_abspath_good(self): l1 = self.router.local() service = self.klass(self.router) service.register_prefix('/etc') path = '/etc/../shadow' pool = mitogen.service.Pool( router=self.router, services=[service], size=1, ) try: e = self.assertRaises(mitogen.core.CallError, lambda: l1.call( mitogen.service.FileService.get, context=self.router.myself(), path=path, out_fp=None ) ) finally: pool.stop() expect = service.unregistered_msg % (path,) self.assertIn(expect, e.args[0])