service: Allow registering path prefixes with FileService.

e.g. service.register_prefix('/') disables all security checks.
issue510
David Wilson 6 years ago
parent 0b162eba18
commit f20e0bbac1

@ -756,6 +756,8 @@ class FileService(Service):
super(FileService, self).__init__(router) super(FileService, self).__init__(router)
#: Set of registered paths. #: Set of registered paths.
self._paths = set() self._paths = set()
#: Set of registered directory prefixes.
self._prefixes = set()
#: Mapping of Stream->FileStreamState. #: Mapping of Stream->FileStreamState.
self._state_by_stream = {} self._state_by_stream = {}
@ -781,6 +783,22 @@ class FileService(Service):
LOG.debug('%r: registering %r', self, path) LOG.debug('%r: registering %r', self, path)
self._paths.add(path) self._paths.add(path)
@expose(policy=AllowParents())
@arg_spec({
'path': mitogen.core.FsPathTypes,
})
def register_prefix(self, path):
"""
Authorize a path and any subpaths for access by children. Repeat calls
with the same path has no effect.
:param str path:
File path.
"""
if path not in self._prefixes:
LOG.debug('%r: registering prefix %r', self, path)
self._prefixes.add(path)
def _generate_stat(self, path): def _generate_stat(self, path):
st = os.stat(path) st = os.stat(path)
if not stat.S_ISREG(st.st_mode): if not stat.S_ISREG(st.st_mode):
@ -844,6 +862,24 @@ class FileService(Service):
fp.close() fp.close()
state.jobs.pop(0) state.jobs.pop(0)
def _prefix_is_authorized(self, path):
"""
Return the set of all possible directory prefixes for `path`.
:func:`os.path.abspath` is used to ensure the path is absolute.
:param str path:
The path.
:returns: Set of prefixes.
"""
path = os.path.abspath(path)
while True:
if path in self._prefixes:
return True
if path == '/':
break
path = os.path.dirname(path)
return False
@expose(policy=AllowAny()) @expose(policy=AllowAny())
@no_reply() @no_reply()
@arg_spec({ @arg_spec({
@ -870,7 +906,7 @@ class FileService(Service):
:raises Error: :raises Error:
Unregistered path, or Sender did not match requestee context. Unregistered path, or Sender did not match requestee context.
""" """
if path not in self._paths: if path not in self._paths and not self._prefix_is_authorized(path):
raise Error(self.unregistered_msg) raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id: if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg) raise Error(self.context_mismatch_msg)

@ -0,0 +1,81 @@
import unittest2
import mitogen.service
import testlib
class FetchTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.service.FileService
def test_unauthorized(self):
service = self.klass(self.router)
e = self.assertRaises(mitogen.service.Error,
lambda: service.fetch(
path='/etc/shadow',
sender=None,
msg=mitogen.core.Message(),
)
)
self.assertEquals(e.args[0], service.unregistered_msg)
def test_path_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register('/etc/passwd')
self.assertEquals(None, service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=mitogen.core.Message(),
))
def test_root_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/')
self.assertEquals(None, service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=mitogen.core.Message(),
))
def test_prefix_authorized(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/etc')
self.assertEquals(None, service.fetch(
path='/etc/passwd',
sender=recv.to_sender(),
msg=mitogen.core.Message(),
))
def test_prefix_authorized_abspath_bad(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/etc')
self.assertEquals(None, service.fetch(
path='/etc/foo/bar/../../../passwd',
sender=recv.to_sender(),
msg=mitogen.core.Message(),
))
def test_prefix_authorized_abspath_bad(self):
recv = mitogen.core.Receiver(self.router)
service = self.klass(self.router)
service.register_prefix('/etc')
e = self.assertRaises(mitogen.service.Error,
lambda: service.fetch(
path='/etc/../shadow',
sender=recv.to_sender(),
msg=mitogen.core.Message(),
)
)
self.assertEquals(e.args[0], service.unregistered_msg)
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save