Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #482: another Py3 fix
  ci: try removing exclude: to make Azure jobs work again
  compat: fix Py2.4 SyntaxError
  issue #482: remove 'ssh' from checked processes
  ci: Py3 fix
  issue #279: add one more test for max_message_size
  issue #482: ci: add stray process checks to all jobs
  tests: fix format string error
  core: MitogenProtocol.is_privileged was not set in children
  issue #482: tests: fail DockerMixin tests if stray processes exist
  docs: update Changelog.
  issue #586: update Changelog.
  docs: update Changelog.
  [security] core: undirectional routing wasn't respected in some cases
  docs: tidy up Select.all()
  issue #612: update Changelog.
pull/618/head
David Wilson 5 years ago
commit ceddc5cee2

@ -20,11 +20,17 @@ def pause_if_interactive():
signal.pause()
interesting = ci_lib.get_interesting_procs()
with ci_lib.Fold('unit_tests'):
os.environ['SKIP_MITOGEN'] = '1'
ci_lib.run('./run_tests -v')
ci_lib.check_stray_processes(interesting)
with ci_lib.Fold('docker_setup'):
containers = ci_lib.make_containers()
ci_lib.start_containers(containers)
@ -75,4 +81,7 @@ with ci_lib.Fold('ansible'):
pause_if_interactive()
raise
ci_lib.check_stray_processes(interesting, containers)
pause_if_interactive()

@ -3,11 +3,6 @@
# Add steps that analyze code, save the dist with the build record, publish to a PyPI-compatible index, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/python
trigger:
branches:
exclude:
- docs-master
jobs:
- job: Mac

@ -215,6 +215,46 @@ def make_containers(name_prefix='', port_offset=0):
return lst
# ssh removed from here because 'linear' strategy relies on processes that hang
# around after the Ansible run completes
INTERESTING_COMMS = ('python', 'sudo', 'su', 'doas')
def proc_is_docker(pid):
try:
fp = open('/proc/%s/cgroup' % (pid,), 'r')
except IOError:
return False
try:
return 'docker' in fp.read()
finally:
fp.close()
def get_interesting_procs(container_name=None):
args = ['ps', '-a', '-x', '-oppid=', '-opid=', '-ocomm=', '-ocommand=']
if container_name is not None:
args = ['docker', 'exec', container_name] + args
out = []
for line in subprocess__check_output(args).decode().splitlines():
ppid, pid, comm, rest = line.split(None, 3)
if (
(
any(comm.startswith(s) for s in INTERESTING_COMMS) or
'mitogen:' in rest
) and
(
container_name is not None or
(not proc_is_docker(pid))
)
):
out.append((int(pid), line))
return sorted(out)
def start_containers(containers):
if os.environ.get('KEEP'):
return
@ -236,9 +276,44 @@ def start_containers(containers):
]
for container in containers
])
for container in containers:
container['interesting'] = get_interesting_procs(container['name'])
return containers
def verify_procs(hostname, old, new):
oldpids = set(pid for pid, _ in old)
if any(pid not in oldpids for pid, _ in new):
print('%r had stray processes running:' % (hostname,))
for pid, line in new:
if pid not in oldpids:
print('New process:', line)
print()
return False
return True
def check_stray_processes(old, containers=None):
ok = True
new = get_interesting_procs()
if old is not None:
ok &= verify_procs('test host machine', old, new)
for container in containers or ():
ok &= verify_procs(
container['name'],
container['interesting'],
get_interesting_procs(container['name'])
)
assert ok, 'stray processes were found'
def dump_file(path):
print()
print('--- %s ---' % (path,))

@ -68,9 +68,15 @@ with ci_lib.Fold('job_setup'):
os.environ['ANSIBLE_HOST_KEY_CHECKING'] = 'False'
interesting = ci_lib.get_interesting_procs()
with ci_lib.Fold('first_run'):
ci_lib.run('debops common %s', ' '.join(sys.argv[1:]))
ci_lib.check_stray_processes(interesting, containers)
with ci_lib.Fold('second_run'):
ci_lib.run('debops common %s', ' '.join(sys.argv[1:]))
ci_lib.check_stray_processes(interesting, containers)

@ -14,4 +14,5 @@ if ci_lib.have_docker():
'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),),
])
ci_lib.run_batches(batches)

@ -14,4 +14,6 @@ os.environ.update({
if not ci_lib.have_docker():
os.environ['SKIP_DOCKER_TESTS'] = '1'
interesting = ci_lib.get_interesting_procs()
ci_lib.run('./run_tests -v')
ci_lib.check_stray_processes(interesting)

@ -164,9 +164,16 @@ Core Library
`closed` flag, preventing historical bugs where a double close could destroy
descriptors belonging to unrelated streams.
* `#586 <https://github.com/dw/mitogen/issues/586>`_: fix import of
:mod:`__main__` on later versions of Python 3 when running from the
interactive console.
* `#606 <https://github.com/dw/mitogen/issues/606>`_: fix example code on the
documentation front page.
* `#612 <https://github.com/dw/mitogen/issues/612>`_: fix various errors
introduced by stream refactoring.
* `a5536c35 <https://github.com/dw/mitogen/commit/a5536c35>`_: avoid quadratic
buffer management when logging lines received from a child's redirected
standard IO.
@ -183,6 +190,14 @@ Core Library
buffered items, causing future :meth:`get` calls to block or fail even though
data existed that could be returned.
* `5924af15 <https://github.com/dw/mitogen/commit/5924af15>`_: *[security]* the
unidirectional routing mode, in which contexts may only communicate with
parents and never siblings (so a program cannot accidentally bridge
air-gapped networks) was not inherited when a child context was initiated
directly from an existing child. This did not effect the Ansible extension,
since the controller initiates any new context used for routing, only forked
tasks are initiated by children.
Thanks!
~~~~~~~
@ -193,8 +208,10 @@ bug reports, testing, features and fixes in this release contributed by
`Anton Markelov <https://github.com/strangeman>`_,
`Dan <https://github.com/dsgnr>`_,
`Dave Cottlehuber <https://github.com/dch>`_,
`Denis Krienbühl <https://github.com/href>`_,
`El Mehdi CHAOUKI <https://github.com/elmchaouki>`_,
`James Hogarth <https://github.com/hogarthj>`_,
`Marc Hartmayer <https://github.com/marc1006>`_,
`Nigel Metheringham <https://github.com/nigelm>`_,
`Orion Poplawski <https://github.com/opoplawski>`_,
`Pieter Voet <https://github.com/pietervoet/>`_,

@ -542,7 +542,8 @@ def extend_path(path, name):
if os.path.isfile(pkgfile):
try:
f = open(pkgfile)
except IOError as msg:
except IOError:
msg = sys.exc_info()[1]
sys.stderr.write("Can't open %s: %s\n" %
(pkgfile, msg))
else:

@ -1937,18 +1937,28 @@ class MitogenProtocol(Protocol):
:class:`Protocol` implementing mitogen's :ref:`stream protocol
<stream-protocol>`.
"""
#: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream.
auth_id = None
#: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
#: its value is the same as :data:`mitogen.context_id` or appears in
#: :data:`mitogen.parent_ids`.
is_privileged = False
def __init__(self, router, remote_id):
def __init__(self, router, remote_id, auth_id=None,
local_id=None, parent_ids=None):
self._router = router
self.remote_id = remote_id
#: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream.
self.auth_id = auth_id
if parent_ids is None:
parent_ids = mitogen.parent_ids
if local_id is None:
local_id = mitogen.context_id
self.is_privileged = (
(remote_id in parent_ids) or
auth_id in ([local_id] + parent_ids)
)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self._input_buf = collections.deque()
self._input_buf_len = 0
@ -2800,8 +2810,8 @@ class Router(object):
broker_exit_msg = 'Broker has exitted'
no_route_msg = 'no route to %r, my ID is %r'
unidirectional_msg = (
'routing mode prevents forward of message from context %d via '
'context %d'
'routing mode prevents forward of message from context %d to '
'context %d via context %d'
)
def __init__(self, broker):
@ -3152,7 +3162,9 @@ class Router(object):
(in_stream.protocol.is_privileged or
out_stream.protocol.is_privileged):
self._maybe_send_dead(msg, self.unidirectional_msg,
in_stream.protocol.remote_id, out_stream.protocol.remote_id)
in_stream.protocol.remote_id,
out_stream.protocol.remote_id,
mitogen.context_id)
return
out_stream.protocol._send(msg)
@ -3623,7 +3635,7 @@ class ExternalContext(object):
self.broker = Broker(activate_compat=False)
self.router = Router(self.broker)
self.router.debug = self.config.get('debug', False)
self.router.undirectional = self.config['unidirectional']
self.router.unidirectional = self.config['unidirectional']
self.router.add_handler(
fn=self._on_shutdown_msg,
handle=SHUTDOWN,
@ -3641,7 +3653,12 @@ class ExternalContext(object):
os.close(in_fd)
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
self.stream = MitogenProtocol.build_stream(self.router, parent_id)
self.stream = MitogenProtocol.build_stream(
self.router,
parent_id,
local_id=self.config['context_id'],
parent_ids=self.config['parent_ids']
)
self.stream.accept(in_fp, out_fp)
self.stream.name = 'parent'
self.stream.receive_side.keep_alive = False

@ -122,9 +122,10 @@ class Select(object):
@classmethod
def all(cls, receivers):
"""
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
Take an iterable of receivers and retrieve a :class:`Message
<mitogen.core.Message>` from each, returning the result of calling
:meth:`Message.unpickle() <mitogen.core.Message.unpickle>` on each in
turn. Results are returned in the order they arrived.
This is sugar for handling batch :meth:`Context.call_async
<mitogen.parent.Context.call_async>` invocations:

@ -162,10 +162,9 @@ class Listener(mitogen.core.Protocol):
stream = mitogen.core.MitogenProtocol.build_stream(
router=self._router,
remote_id=context_id,
auth_id=mitogen.context_id,
)
stream.name = u'unix_client.%d' % (pid,)
stream.protocol.auth_id = mitogen.context_id
stream.protocol.is_privileged = True
stream.accept(sock, sock)
LOG.debug('listener: accepted connection from PID %d: %s',
pid, stream.name)

@ -1,9 +1,11 @@
import sys
import time
import zlib
import unittest2
import testlib
import mitogen.core
import mitogen.master
import mitogen.parent
import mitogen.utils
@ -260,6 +262,13 @@ class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
size = remote.call(return_router_max_message_size)
self.assertEquals(size, 64*1024)
def test_remote_of_remote_configured(self):
router = self.klass(broker=self.broker, max_message_size=64*1024)
remote = router.local()
remote2 = router.local(via=remote)
size = remote2.call(return_router_max_message_size)
self.assertEquals(size, 64*1024)
def test_remote_exceeded(self):
# Ensure new contexts receive a router with the same value.
router = self.klass(broker=self.broker, max_message_size=64*1024)
@ -341,22 +350,43 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
))
class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
def test_siblings_cant_talk(self):
self.router.unidirectional = True
l1 = self.router.local()
l2 = self.router.local()
def test_siblings_cant_talk(router):
l1 = router.local()
l2 = router.local()
logs = testlib.LogCapturer()
logs.start()
e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1))
msg = self.router.unidirectional_msg % (
try:
l2.call(ping_context, l1)
except mitogen.core.CallError:
e = sys.exc_info()[1]
msg = mitogen.core.Router.unidirectional_msg % (
l2.context_id,
l1.context_id,
mitogen.context_id,
)
self.assertTrue(msg in str(e))
self.assertTrue('routing mode prevents forward of ' in logs.stop())
assert msg in str(e)
assert 'routing mode prevents forward of ' in logs.stop()
@mitogen.core.takes_econtext
def test_siblings_cant_talk_remote(econtext):
mitogen.parent.upgrade_router(econtext)
test_siblings_cant_talk(econtext.router)
class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
def test_siblings_cant_talk_master(self):
self.router.unidirectional = True
test_siblings_cant_talk(self.router)
def test_siblings_cant_talk_parent(self):
# ensure 'unidirectional' attribute is respected for contexts started
# by children.
self.router.unidirectional = True
parent = self.router.local()
parent.call(test_siblings_cant_talk_remote)
def test_auth_id_can_talk(self):
self.router.unidirectional = True

@ -454,6 +454,22 @@ class DockerizedSshDaemon(object):
def wait_for_sshd(self):
wait_for_port(self.get_host(), self.port, pattern='OpenSSH')
def check_processes(self):
args = ['docker', 'exec', self.container_name, 'ps', '-o', 'comm=']
counts = {}
for comm in subprocess__check_output(args).decode().splitlines():
comm = comm.strip()
counts[comm] = counts.get(comm, 0) + 1
if counts != {'ps': 1, 'sshd': 1}:
assert 0, (
'Docker container %r contained extra running processes '
'after test completed: %r' % (
self.container_name,
counts
)
)
def close(self):
args = ['docker', 'rm', '-f', self.container_name]
subprocess__check_output(args)
@ -501,6 +517,7 @@ class DockerMixin(RouterMixin):
@classmethod
def tearDownClass(cls):
cls.dockerized_ssh.check_processes()
cls.dockerized_ssh.close()
super(DockerMixin, cls).tearDownClass()

Loading…
Cancel
Save