|
|
|
@ -78,16 +78,23 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
super(ContextService, self).__init__(*args, **kwargs)
|
|
|
|
|
#: JoinPoint for context responses.
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
#: Records the :meth:`get` result dict for successful calls, returned
|
|
|
|
|
#: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`.
|
|
|
|
|
self._response_by_key = {}
|
|
|
|
|
#: List of :class:`mitogen.core.Message` waiting for the result dict
|
|
|
|
|
#: for a particular connection config. Keyed as sbove.
|
|
|
|
|
self._waiters_by_key = {}
|
|
|
|
|
#: Active context count.
|
|
|
|
|
#: Mapping of :class:`mitogen.core.Context` -> reference count. Each
|
|
|
|
|
#: call to :meth:`get` increases this by one. Calls to :meth:`put`
|
|
|
|
|
#: decrease it by one.
|
|
|
|
|
self._refs_by_context = {}
|
|
|
|
|
#: List of contexts in creation order by via= parameter.
|
|
|
|
|
self._lru_by_via = {}
|
|
|
|
|
#: (method_name, kwargs) pairs by Conetxt
|
|
|
|
|
self._cfg_by_context = {}
|
|
|
|
|
#: List of contexts in creation order by via= parameter. When
|
|
|
|
|
#: :attr:`max_interpreters` is reached, the most recently used context
|
|
|
|
|
#: is destroyed to make room for any additional context.
|
|
|
|
|
self._update_lru_by_via = {}
|
|
|
|
|
#: :meth:`key_from_kwargs` result by Context.
|
|
|
|
|
self._key_by_context = {}
|
|
|
|
|
|
|
|
|
|
@mitogen.service.expose(mitogen.service.AllowParents())
|
|
|
|
|
@mitogen.service.arg_spec({
|
|
|
|
@ -111,6 +118,17 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
return pprint.pformat(kwargs)
|
|
|
|
|
|
|
|
|
|
def _produce_response(self, key, response):
|
|
|
|
|
"""
|
|
|
|
|
Reply to every waiting request matching a configuration key with a
|
|
|
|
|
response dictionary, deleting the list of waiters when done.
|
|
|
|
|
|
|
|
|
|
:param str key:
|
|
|
|
|
Result of :meth:`key_from_kwargs`
|
|
|
|
|
:param dict response:
|
|
|
|
|
Response dictionary
|
|
|
|
|
:returns:
|
|
|
|
|
Number of waiters that were replied to.
|
|
|
|
|
"""
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
waiters = self._waiters_by_key.pop(key)
|
|
|
|
@ -121,13 +139,18 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
self._lock.release()
|
|
|
|
|
return count
|
|
|
|
|
|
|
|
|
|
def _lru(self, new_context, **kwargs):
|
|
|
|
|
def _update_lru(self, new_context, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
Update the LRU ("MRU"?) list associated with the connection described
|
|
|
|
|
by `kwargs`, destroying the most recently created context if the list
|
|
|
|
|
is full. Finally add `new_context` to the list.
|
|
|
|
|
"""
|
|
|
|
|
via = kwargs.get('via')
|
|
|
|
|
if via is None:
|
|
|
|
|
# We don't have a limit on the number of directly connections.
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
lru = self._lru_by_via.setdefault(via, [])
|
|
|
|
|
lru = self._update_lru_by_via.setdefault(via, [])
|
|
|
|
|
if len(lru) < self.max_interpreters:
|
|
|
|
|
lru.append(new_context)
|
|
|
|
|
return
|
|
|
|
@ -143,20 +166,44 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
LOG.info('%r._discard_one(): shutting down %r', self, context)
|
|
|
|
|
context.shutdown()
|
|
|
|
|
|
|
|
|
|
method_name, kwargs = self._cfg_by_context[context]
|
|
|
|
|
key = self.key_from_kwargs(method_name=method_name, **kwargs)
|
|
|
|
|
key = self._key_by_context[context]
|
|
|
|
|
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
del self._response_by_key[key]
|
|
|
|
|
del self._refs_by_context[context]
|
|
|
|
|
del self._cfg_by_context[context]
|
|
|
|
|
del self._key_by_context[context]
|
|
|
|
|
lru.remove(context)
|
|
|
|
|
lru.append(new_context)
|
|
|
|
|
finally:
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
def _connect(self, method_name, **kwargs):
|
|
|
|
|
def _connect(self, key, method_name, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
Actual connect implementation. Arranges for the Mitogen connection to
|
|
|
|
|
be created and enqueues an asynchronous call to start the forked task
|
|
|
|
|
parent in the remote context.
|
|
|
|
|
|
|
|
|
|
:param key:
|
|
|
|
|
Deduplication key representing the connection configuration.
|
|
|
|
|
:param method_name:
|
|
|
|
|
:class:`mitogen.parent.Router` method implementing the connection
|
|
|
|
|
type.
|
|
|
|
|
:param kwargs:
|
|
|
|
|
Keyword arguments passed to the router method.
|
|
|
|
|
:returns:
|
|
|
|
|
Dict like::
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
'context': mitogen.core.Context or None,
|
|
|
|
|
'home_dir': str or None,
|
|
|
|
|
'msg': str or None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Where either `msg` is an error message and the remaining fields are
|
|
|
|
|
:data:`None`, or `msg` is :data:`None` and the remaining fields are
|
|
|
|
|
set.
|
|
|
|
|
"""
|
|
|
|
|
method = getattr(self.router, method_name, None)
|
|
|
|
|
if method is None:
|
|
|
|
|
raise Error('no such Router method: %s' % (method_name,))
|
|
|
|
@ -171,13 +218,13 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if kwargs.get('via'):
|
|
|
|
|
self._lru(context, method_name=method_name, **kwargs)
|
|
|
|
|
self._update_lru(context, method_name=method_name, **kwargs)
|
|
|
|
|
home_dir = context.call(os.path.expanduser, '~')
|
|
|
|
|
|
|
|
|
|
# We don't need to wait for the result of this. Ideally we'd check its
|
|
|
|
|
# return value somewhere, but logs will catch a failure anyway.
|
|
|
|
|
context.call_async(ansible_mitogen.target.start_fork_parent)
|
|
|
|
|
self._cfg_by_context[context] = (method_name, kwargs)
|
|
|
|
|
self._key_by_context[context] = key
|
|
|
|
|
self._refs_by_context[context] = 0
|
|
|
|
|
return {
|
|
|
|
|
'context': context,
|
|
|
|
@ -222,13 +269,14 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
finally:
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
|
|
|
|
# I'm the first thread to wait on a result, so I will create the
|
|
|
|
|
# connection.
|
|
|
|
|
# I'm the first thread to wait, so I will create the connection.
|
|
|
|
|
try:
|
|
|
|
|
response = self._connect(**kwargs)
|
|
|
|
|
response = self._connect(key, **kwargs)
|
|
|
|
|
count = self._produce_response(key, response)
|
|
|
|
|
if response['msg'] is None:
|
|
|
|
|
# Only record the response for non-error results.
|
|
|
|
|
self._response_by_key[key] = response
|
|
|
|
|
# Set the reference count to the number of waiters.
|
|
|
|
|
self._refs_by_context[response['context']] += count
|
|
|
|
|
except mitogen.core.CallError:
|
|
|
|
|
e = sys.exc_info()[1]
|
|
|
|
|