master: log error an refuse __main__ import if no guard detected.

Closes #366.
issue72
David Wilson 6 years ago
parent f6b201bdfc
commit 4146648759

@ -553,6 +553,14 @@ class ModuleResponder(object):
return 'ModuleResponder(%r)' % (self._router,) return 'ModuleResponder(%r)' % (self._router,)
MAIN_RE = re.compile(b(r'^if\s+__name__\s*==\s*.__main__.\s*:'), re.M) MAIN_RE = re.compile(b(r'^if\s+__name__\s*==\s*.__main__.\s*:'), re.M)
main_guard_msg = (
"A child context attempted to import __main__, however the main "
"module present in the master process lacks an execution guard. "
"Update %r to prevent unintended execution, using a guard like:\n"
"\n"
" if __name__ == '__main__':\n"
" # your code here.\n"
)
def whitelist_prefix(self, fullname): def whitelist_prefix(self, fullname):
if self.whitelist == ['']: if self.whitelist == ['']:
@ -562,14 +570,19 @@ class ModuleResponder(object):
def blacklist_prefix(self, fullname): def blacklist_prefix(self, fullname):
self.blacklist.append(fullname) self.blacklist.append(fullname)
def neutralize_main(self, src): def neutralize_main(self, path, src):
"""Given the source for the __main__ module, try to find where it """Given the source for the __main__ module, try to find where it
begins conditional execution based on a "if __name__ == '__main__'" begins conditional execution based on a "if __name__ == '__main__'"
guard, and remove any code after that point.""" guard, and remove any code after that point."""
match = self.MAIN_RE.search(src) match = self.MAIN_RE.search(src)
if match: if match:
return src[:match.start()] return src[:match.start()]
return src
if b('mitogen.main(') in src:
return src
LOG.error(self.main_guard_msg, path)
raise ImportError('refused')
def _make_negative_response(self, fullname): def _make_negative_response(self, fullname):
return (fullname, None, None, None, ()) return (fullname, None, None, None, ())
@ -596,7 +609,7 @@ class ModuleResponder(object):
pkg_present = None pkg_present = None
if fullname == '__main__': if fullname == '__main__':
source = self.neutralize_main(source) source = self.neutralize_main(path, source)
compressed = mitogen.core.Blob(zlib.compress(source, 9)) compressed = mitogen.core.Blob(zlib.compress(source, 9))
related = [ related = [
to_text(name) to_text(name)

@ -1,5 +1,6 @@
import mock import mock
import textwrap
import subprocess import subprocess
import sys import sys
@ -12,6 +13,60 @@ import plain_old_module
import simple_pkg.a import simple_pkg.a
class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase):
klass = mitogen.master.ModuleResponder
def call(self, *args, **kwargs):
return self.klass(self.router).neutralize_main(*args, **kwargs)
def test_missing_exec_guard(self):
path = testlib.data_path('main_with_no_exec_guard.py')
args = [sys.executable, path]
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
_, stderr = proc.communicate()
self.assertEquals(1, proc.returncode)
expect = self.klass.main_guard_msg % (path,)
self.assertTrue(expect in stderr.decode())
HAS_MITOGEN_MAIN = mitogen.core.b(
textwrap.dedent("""
herp derp
def myprog():
pass
@mitogen.main(maybe_some_option=True)
def main(router):
pass
""")
)
def test_mitogen_main(self):
untouched = self.call("derp.py", self.HAS_MITOGEN_MAIN)
self.assertEquals(untouched, self.HAS_MITOGEN_MAIN)
HAS_EXEC_GUARD = mitogen.core.b(
textwrap.dedent("""
herp derp
def myprog():
pass
def main():
pass
if __name__ == '__main__':
main()
""")
)
def test_exec_guard(self):
touched = self.call("derp.py", self.HAS_EXEC_GUARD)
bits = touched.decode().split()
self.assertEquals(bits[-3:], ['def', 'main():', 'pass'])
class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase):
def test_plain_old_module(self): def test_plain_old_module(self):
# The simplest case: a top-level module with no interesting imports or # The simplest case: a top-level module with no interesting imports or

Loading…
Cancel
Save