issue #159: initial context LRU implementation

Now Connection.close() *must* be called in the worker, to ensure the
reference count for a context drops correctly.

Remove 'discriminator' for now, I'm not using it for testing any more
and it complicated this code.

This code is a car crash, it needs rewritten again. Ideally some/most of
this behaviour could live on services.DeduplicatingService somehow, but
I couldn't come up with a sensible design.
pull/193/head
David Wilson 6 years ago
parent 6394226722
commit cc980569a3

@ -58,7 +58,13 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: presently always the master process.
parent = None
#: mitogen.master.Context used to communicate with the target user account.
#: mitogen.master.Context connected to the target machine's initial SSH
#: account.
host = None
#: mitogen.master.Context connected to the target user account on the
#: target machine (i.e. via sudo), or simply a copy of :attr:`host` if
#: become is not in use.
context = None
#: Only sudo is supported for now.
@ -79,9 +85,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: Set to 'ansible_ssh_timeout' by on_action_run().
ansible_ssh_timeout = None
#: Set to 'mitogen_ssh_discriminator' by on_action_run()
mitogen_ssh_discriminator = None
#: Set after connection to the target context's home directory.
_homedir = None
@ -111,10 +114,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
executing. We use the opportunity to grab relevant bits from the
task-specific data.
"""
self.mitogen_ssh_discriminator = task_vars.get(
'mitogen_ssh_discriminator',
None
)
self.ansible_ssh_timeout = task_vars.get(
'ansible_ssh_timeout',
None
@ -162,7 +161,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
dct = mitogen.service.call(
context=self.parent,
handle=ContextService.handle,
method='connect',
method='get',
kwargs=mitogen.utils.cast(kwargs),
)
@ -190,7 +189,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
'method_name': 'ssh',
'check_host_keys': False, # TODO
'hostname': self._play_context.remote_addr,
'discriminator': self.mitogen_ssh_discriminator,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
@ -298,6 +296,17 @@ class Connection(ansible.plugins.connection.ConnectionBase):
gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times.
"""
for context in set([self.host, self.context]):
if context:
mitogen.service.call(
context=self.parent,
handle=ContextService.handle,
method='put',
kwargs={
'context': context
}
)
self.host = None
self.context = None
if self.broker and not new_task:
@ -378,10 +387,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
Implement put_file() by caling the corresponding
ansible_mitogen.target function in the target.
:param str in_path:
Local filesystem path to read.
:param str out_path:
Remote filesystem path to write.
:param byte data:
File contents to put.
"""
self.call(ansible_mitogen.target.write_path,
mitogen.utils.cast(out_path),

@ -39,7 +39,9 @@ when a child has completed a job.
from __future__ import absolute_import
import logging
import sys
import os.path
import pprint
import threading
import zlib
@ -55,7 +57,7 @@ class Error(Exception):
pass
class ContextService(mitogen.service.DeduplicatingService):
class ContextService(mitogen.service.Service):
"""
Used by workers to fetch the single Context instance corresponding to a
connection configuration, creating the matching connection if it does not
@ -71,33 +73,93 @@ class ContextService(mitogen.service.DeduplicatingService):
"""
handle = 500
max_message_size = 1000
max_contexts = 20
def __init__(self, *args, **kwargs):
super(ContextService, self).__init__(*args, **kwargs)
#: JoinPoint for context responses.
self._lock = threading.Lock()
self._response_by_key = {}
self._waiters_by_key = {}
#: Active context count.
self._refs_by_context = {}
#: List of contexts in creation order by via= parameter.
self._lru_by_via = {}
#: (method_name, kwargs) pairs by Conetxt
self._cfg_by_context = {}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'method_name': str
'context': mitogen.core.Context
})
def connect(self, method_name, discriminator=None, **kwargs):
def put(self, context):
"""
Return a Context referring to an established connection with the given
configuration, establishing a new connection as necessary.
:param str method_name:
The :class:`mitogen.parent.Router` connection method to use.
:param discriminator:
Mixed into the key used to select an existing connection, to allow
intentional duplicate connections to be created.
:param dict kwargs:
Keyword arguments passed to `mitogen.master.Router.[method_name]()`.
Return a reference, making it eligable for recycling once its reference
count reaches zero.
"""
LOG.debug('%r.put(%r)', self, context)
assert self._refs_by_context[context] > 0
self._refs_by_context[context] -= 1
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the
target context.
* `home_dir` is a cached copy of the remote directory.
def key_from_kwargs(self, **kwargs):
"""
Generate a deduplication key from the request. The default
implementation returns a string based on a stable representation of the
input dictionary generated by :py:func:`pprint.pformat`.
"""
return pprint.pformat(kwargs)
def _produce_response(self, key, response):
self._lock.acquire()
try:
waiters = self._waiters_by_key.pop(key)
count = len(waiters)
for msg in waiters:
msg.reply(response)
finally:
self._lock.release()
return count
def _lru(self, new_context, **kwargs):
via = kwargs.get('via')
if via is None:
# We don't have a limit on the number of directly connections.
return
lru = self._lru_by_via.setdefault(via, [])
if len(lru) < self.max_contexts:
lru.append(new_context)
return
for context in reversed(lru):
if self._refs_by_context[context] == 0:
break
else:
LOG.warning('via=%r reached maximum number of interpreters, '
'but they are all marked as in-use.', via)
return
LOG.info('%r._discard_one(): shutting down %r', self, context)
context.shutdown()
method_name, kwargs = self._cfg_by_context[context]
key = self.key_from_kwargs(method_name=method_name, **kwargs)
self._lock.acquire()
try:
del self._response_by_key[key]
del self._refs_by_context[context]
del self._cfg_by_context[context]
lru.remove(context)
lru.append(new_context)
finally:
self._lock.release()
def _connect(self, method_name, **kwargs):
method = getattr(self.router, method_name, None)
if method is None:
raise Error('no such Router method: %s' % (method_name,))
try:
context = method(**kwargs)
except mitogen.core.StreamError as e:
@ -107,17 +169,75 @@ class ContextService(mitogen.service.DeduplicatingService):
'msg': str(e),
}
if kwargs.get('via'):
self._lru(context, method_name=method_name, **kwargs)
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent)
self._cfg_by_context[context] = (method_name, kwargs)
self._refs_by_context[context] = 0
return {
'context': context,
'home_dir': home_dir,
'msg': None,
}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'method_name': str
})
def get(self, msg, **kwargs):
"""
Return a Context referring to an established connection with the given
configuration, establishing a new connection as necessary.
:param str method_name:
The :class:`mitogen.parent.Router` connection method to use.
:param dict kwargs:
Keyword arguments passed to `mitogen.master.Router.[method_name]()`.
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the
target context.
* `home_dir` is a cached copy of the remote directory.
"""
key = self.key_from_kwargs(**kwargs)
self._lock.acquire()
try:
response = self._response_by_key.get(key)
if response is not None:
self._refs_by_context[response['context']] += 1
return response
waiters = self._waiters_by_key.get(key)
if waiters is not None:
waiters.append(msg)
return self.NO_REPLY
self._waiters_by_key[key] = [msg]
finally:
self._lock.release()
# I'm the first thread to wait on a result, so I will create the
# connection.
try:
response = self._connect(**kwargs)
count = self._produce_response(key, response)
if response['msg'] is None:
self._response_by_key[key] = response
self._refs_by_context[response['context']] += count
except mitogen.core.CallError:
e = sys.exc_info()[1]
self._produce_response(key, e)
except Exception:
e = sys.exc_info()[1]
self._produce_response(key, mitogen.core.CallError(e))
return self.NO_REPLY
class FileService(mitogen.service.Service):
"""

@ -135,10 +135,6 @@ High Risk
exhaust available RAM. This will be fixed soon as it's likely to be tickled
by common playbooks.
* No mechanism exists to bound the number of interpreters created during a run.
For some playbooks that parameterize ``become_user`` over many accounts,
resource exhaustion may be triggered on the target machine.
Low Risk
~~~~~~~~
@ -291,9 +287,6 @@ This list will grow as more missing pieces are discovered.
* ``ansible_ssh_private_key_file``
* ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless)
* ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args``
* ``mitogen_ssh_discriminator``: if present, a string mixed into the key used
to deduplicate connections. This permits intentional duplicate Mitogen
connections to a single host, which is probably only useful for testing.
Sudo Variables
@ -373,6 +366,43 @@ internal list can be updated to prevent users bumping into the same problem in
future.
Interpreter Recycling
~~~~~~~~~~~~~~~~~~~~~
To avoid accidental DoS of targets, the extension stops creating persistent
interpreters after the 20th interpreter has been created. Instead the most
recently created interpreter is shut down to make room for any new interpreter.
This is to avoid situations like below from triggering memory exhaustion by
spawning a huge number of interpreters.
.. code-block:: yaml
- hosts: corp_boxes
vars:
user_directory: [
# 10,000 corporate user accounts
]
tasks:
- name: Create user bashrc
become: true
vars:
ansible_become_user: "{{item}}"
copy:
src: bashrc
dest: "~{{item}}/.bashrc"
with_items: "{{user_directory}}"
The recycling behaviour does not occur for direct connections from the Ansible
controller, and it is keyed on a per-host basis, i.e. up to 20 interpreters may
exist for each directly connected target host.
The newest interpreter is chosen to avoid recycling useful accounts, like
"root" or "postgresql" that tend to appear early in a run, however it is simple
to construct a playbook that defeats this strategy. A future version will key
interpreters on the identity of the task, file and/or playbook that created
them, avoiding the recycling of useful accounts in every scenario.
Runtime Patches
~~~~~~~~~~~~~~~

@ -619,6 +619,14 @@ class ChildIdAllocator(object):
class Context(mitogen.core.Context):
via = None
def __eq__(self, other):
return (isinstance(other, mitogen.core.Context) and
(other.context_id == self.context_id) and
(other.router == self.router))
def __hash__(self):
return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(%r, *%r, **%r)',
self, fn, args, kwargs)

@ -174,7 +174,10 @@ class Service(object):
def _on_receive_message(self, msg):
method_name, kwargs = self._validate_message(msg)
return getattr(self, method_name)(**kwargs)
method = getattr(self, method_name)
if 'msg' in method.func_code.co_varnames:
kwargs['msg'] = msg # TODO: hack
return method(**kwargs)
def on_receive_message(self, msg):
try:

Loading…
Cancel
Save