You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/ansible_mitogen/services.py

581 lines
23 KiB
Python

# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Classes in this file define Mitogen 'services' that run (initially) within the
connection multiplexer process that is forked off the top-level controller
process.
Once a worker process connects to a multiplexer process
(Connection._connect()), it communicates with these services to establish new
connections, grant access to files by children, and register for notification
when a child has completed a job.
"""
from __future__ import absolute_import
import logging
import os
import os.path
import pprint
import sys
import threading
import zlib
import mitogen
import mitogen.service
import ansible_mitogen.target
LOG = logging.getLogger(__name__)
class Error(Exception):
pass
class ContextService(mitogen.service.Service):
"""
Used by workers to fetch the single Context instance corresponding to a
connection configuration, creating the matching connection if it does not
exist.
For connection methods and their parameters, see:
https://mitogen.readthedocs.io/en/latest/api.html#context-factories
This concentrates connections in the top-level process, which may become a
bottleneck. The bottleneck can be removed using per-CPU connection
processes and arranging for the worker to select one according to a hash of
the connection parameters (sharding).
"""
handle = 500
max_message_size = 1000
max_interpreters = int(os.getenv('MITOGEN_MAX_INTERPRETERS', '20'))
def __init__(self, *args, **kwargs):
super(ContextService, self).__init__(*args, **kwargs)
self._lock = threading.Lock()
#: Records the :meth:`get` result dict for successful calls, returned
#: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`.
self._response_by_key = {}
#: List of :class:`mitogen.core.Latch` awaiting the result for a
#: particular key.
self._latches_by_key = {}
#: Mapping of :class:`mitogen.core.Context` -> reference count. Each
#: call to :meth:`get` increases this by one. Calls to :meth:`put`
#: decrease it by one.
self._refs_by_context = {}
#: List of contexts in creation order by via= parameter. When
#: :attr:`max_interpreters` is reached, the most recently used context
#: is destroyed to make room for any additional context.
self._lru_by_via = {}
#: :meth:`key_from_kwargs` result by Context.
self._key_by_context = {}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'context': mitogen.core.Context
})
def put(self, context):
"""
Return a reference, making it eligable for recycling once its reference
count reaches zero.
"""
LOG.debug('%r.put(%r)', self, context)
if self._refs_by_context.get(context, 0) == 0:
LOG.warning('%r.put(%r): refcount was 0. shutdown_all called?',
self, context)
return
self._refs_by_context[context] -= 1
def key_from_kwargs(self, **kwargs):
"""
Generate a deduplication key from the request. The default
implementation returns a string based on a stable representation of the
input dictionary generated by :py:func:`pprint.pformat`.
"""
return pprint.pformat(kwargs)
def _produce_response(self, key, response):
"""
Reply to every waiting request matching a configuration key with a
response dictionary, deleting the list of waiters when done.
:param str key:
Result of :meth:`key_from_kwargs`
:param dict response:
Response dictionary
:returns:
Number of waiters that were replied to.
"""
self._lock.acquire()
try:
latches = self._latches_by_key.pop(key)
count = len(latches)
for latch in latches:
latch.put(response)
finally:
self._lock.release()
return count
def _shutdown(self, context, lru=None, new_context=None):
"""
Arrange for `context` to be shut down, and optionally add `new_context`
to the LRU list while holding the lock.
"""
LOG.info('%r._shutdown(): shutting down %r', self, context)
context.shutdown()
key = self._key_by_context[context]
self._lock.acquire()
try:
del self._response_by_key[key]
del self._refs_by_context[context]
del self._key_by_context[context]
if lru:
lru.remove(context)
if new_context:
lru.append(new_context)
finally:
self._lock.release()
def _update_lru(self, new_context, spec, via):
"""
Update the LRU ("MRU"?) list associated with the connection described
by `kwargs`, destroying the most recently created context if the list
is full. Finally add `new_context` to the list.
"""
lru = self._lru_by_via.setdefault(via, [])
if len(lru) < self.max_interpreters:
lru.append(new_context)
return
for context in reversed(lru):
if self._refs_by_context[context] == 0:
break
else:
LOG.warning('via=%r reached maximum number of interpreters, '
'but they are all marked as in-use.', via)
return
self._shutdown(context, lru=lru, new_context=new_context)
@mitogen.service.expose(mitogen.service.AllowParents())
def shutdown_all(self):
"""
For testing use, arrange for all connections to be shut down.
"""
for context in list(self._key_by_context):
self._shutdown(context)
self._lru_by_via = {}
def _on_stream_disconnect(self, stream):
"""
Respond to Stream disconnection by deleting any record of contexts
reached via that stream. This method runs in the Broker thread and must
not to block.
"""
# TODO: there is a race between creation of a context and disconnection
# of its related stream. An error reply should be sent to any message
# in _latches_by_key below.
self._lock.acquire()
try:
for context, key in list(self._key_by_context.items()):
if context.context_id in stream.routes:
LOG.info('Dropping %r due to disconnect of %r',
context, stream)
self._response_by_key.pop(key, None)
self._latches_by_key.pop(key, None)
self._refs_by_context.pop(context, None)
self._lru_by_via.pop(context, None)
self._refs_by_context.pop(context, None)
finally:
self._lock.release()
def _connect(self, key, spec, via=None):
"""
Actual connect implementation. Arranges for the Mitogen connection to
be created and enqueues an asynchronous call to start the forked task
parent in the remote context.
:param key:
Deduplication key representing the connection configuration.
:param spec:
Connection specification.
:returns:
Dict like::
{
'context': mitogen.core.Context or None,
'home_dir': str or None,
'msg': str or None
}
Where either `msg` is an error message and the remaining fields are
:data:`None`, or `msg` is :data:`None` and the remaining fields are
set.
"""
try:
method = getattr(self.router, spec['method'])
except AttributeError:
raise Error('unsupported method: %(transport)s' % spec)
context = method(via=via, **spec['kwargs'])
if via:
self._update_lru(context, spec, via)
else:
# For directly connected contexts, listen to the associated
# Stream's disconnect event and use it to invalidate dependent
# Contexts.
stream = self.router.stream_by_id(context.context_id)
mitogen.core.listen(stream, 'disconnect',
lambda: self._on_stream_disconnect(stream))
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
from mitogen import debug
context.call(debug.dump_to_logger)
self._key_by_context[context] = key
self._refs_by_context[context] = 0
return {
'context': context,
'home_dir': home_dir,
'msg': None,
}
def _wait_or_start(self, spec, via=None):
latch = mitogen.core.Latch()
key = self.key_from_kwargs(via=via, **spec)
self._lock.acquire()
try:
response = self._response_by_key.get(key)
if response is not None:
self._refs_by_context[response['context']] += 1
latch.put(response)
return latch
latches = self._latches_by_key.setdefault(key, [])
first = len(latches) == 0
latches.append(latch)
finally:
self._lock.release()
if first:
# I'm the first requestee, so I will create the connection.
try:
response = self._connect(key, spec, via=via)
count = self._produce_response(key, response)
# Only record the response for non-error results.
self._response_by_key[key] = response
# Set the reference count to the number of waiters.
self._refs_by_context[response['context']] += count
except Exception:
self._produce_response(key, sys.exc_info())
return latch
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'stack': list
})
def get(self, msg, stack):
"""
Return a Context referring to an established connection with the given
configuration, establishing new connections as necessary.
:param list stack:
Connection descriptions. Each element is a dict containing 'method'
and 'kwargs' keys describing the Router method and arguments.
Subsequent elements are proxied via the previous.
:returns dict:
* context: mitogen.master.Context or None.
* homedir: Context's home directory or None.
* msg: StreamError exception text or None.
* method_name: string failing method name.
"""
via = None
for spec in stack:
try:
result = self._wait_or_start(spec, via=via).get()
if isinstance(result, tuple): # exc_info()
e1, e2, e3 = result
raise e1, e2, e3
via = result['context']
except mitogen.core.StreamError as e:
return {
'context': None,
'home_dir': None,
'method_name': spec['method'],
'msg': str(e),
}
return result
class FileService(mitogen.service.Service):
"""
Streaming file server, used to serve both small files like Ansible module
sources, and huge files like ISO images. Paths must be explicitly added to
the service by a trusted context before they will be served to an untrusted
context.
The file service nominally lives on the mitogen.service.Pool() threads
shared with ContextService above, however for simplicity it also maintains
a dedicated thread from where file chunks are scheduled.
The scheduler thread is responsible for dividing transfer requests up among
the physical streams that connect to those contexts, and ensure each stream
never has an excessive amount of data buffered in RAM at any time.
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
reachable over the same stream (e.g. one is the SSH account, another is a
sudo account, and a third is a proxied SSH connection), each request is
satisfied in turn before chunks for subsequent requests start flowing. This
ensures when a connection is contended, that preference is given to
completing individual transfers, rather than potentially aborting many
partially complete transfers, causing all the bandwidth used to be wasted.
Theory of operation:
1. Trusted context (i.e. a WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Untrusted context creates a mitogen.core.Receiver() to receive
file chunks. It then calls fetch(path, recv.to_sender()), which sets
up the transfer. The fetch() method returns the final file size and
notifies the dedicated thread of the transfer request.
3. The dedicated thread wakes from perpetual sleep, looks up the stream
used to communicate with the untrusted context, and begins pumping
128KiB-sized chunks until that stream's output queue reaches a
limit (1MiB).
4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary
to refill any drained output queue, which are being asynchronously
drained by the Stream implementation running on the Broker thread.
5. Once the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, and allows that code to compare the
transferred size with the total file size indicated by the return
value of the fetch() method.
6. If the sizes mismatch, the caller is informed, which will discard
the result and log an error.
7. Once all chunks have been pumped for all transfers, the dedicated
thread stops waking at 10ms intervals and resumes perpetual sleep.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for all the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() marks the dedicated thread's queue as
closed, causing the dedicated thread to wake immediately. It will
throw an exception that begins shutdown of the main loop.
3. The main loop calls Sender.close() prematurely for every pending
transfer, causing any Receiver loops in the target contexts to exit
early. The file size check fails, and the partially downloaded file
is discarded, and an error is logged.
4. Control exits the file transfer function in every target, and
graceful target shutdown can proceed normally, without the
associated thread needing to be forcefully killed.
"""
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
#: Maximum size of any stream's output queue before we temporarily stop
#: pumping more file chunks on that stream. The queue may overspill by up
#: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more data to
#: pump, but while at least one transfer remains active. With
#: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any
#: single stream is 112MiB/sec, which is >5x what SSH can handle on my
#: laptop.
sleep_delay_secs = 0.01
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._size_by_path = {}
#: Queue used to communicate from service to scheduler thread.
self._queue = mitogen.core.Latch()
#: Mapping of Stream->[(Sender, file object)].
self._pending_by_stream = {}
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def on_shutdown(self):
"""
Respond to shutdown of the service pool by marking our queue closed.
This causes :meth:`_sleep_on_queue` to wake immediately and return
:data:`False`, causing the scheduler main thread to exit.
"""
self._queue.close()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as OS scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
"""
latch = mitogen.core.Latch()
self.router.broker.defer(lambda: latch.put(stream.pending_bytes()))
return latch.get()
def _schedule_pending(self, stream, pending):
"""
Consider the pending file transfers for a single stream, pumping new
file chunks into the stream's queue while its size is below the
configured limit.
:param mitogen.core.Stream stream:
Stream to pump chunks for.
:param pending:
Corresponding list from :attr:`_pending_by_stream`.
"""
while pending and self._pending_bytes(stream) < self.max_queue_size:
sender, fp = pending[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
if s:
sender.send(s)
continue
# Empty read, indicating this file is fully transferred. Mark the
# sender closed (causing the corresponding Receiver loop in the
# target to exit), close the file handle, remove our entry from the
# pending list, and delete the stream's entry in the pending map if
# no more sends remain.
sender.close()
fp.close()
pending.pop(0)
if not pending:
del self._pending_by_stream[stream]
def _sleep_on_queue(self):
"""
Sleep indefinitely (no active transfers) or for
:attr:`sleep_delay_secs` (active transfers) waiting for a new transfer
request to arrive from the :meth:`fetch` method.
If a new request arrives, add it to the appropriate list in
:attr:`_pending_by_stream`.
:returns:
:data:`True` the scheduler's queue is still open,
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
if self._pending_by_stream:
timeout = self.sleep_delay_secs
else:
timeout = None
try:
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
except mitogen.core.TimeoutError:
return True
LOG.debug('%r._sleep_on_queue(): setting up %r for %r',
self, fp.name, sender)
stream = self.router.stream_by_id(sender.context.context_id)
pending = self._pending_by_stream.setdefault(stream, [])
pending.append((sender, fp))
return True
def _scheduler_main(self):
"""
Scheduler thread's main function. Sleep until
:meth:`_sleep_on_queue` indicates the queue has been shut down,
pumping pending file chunks each time we wake.
"""
while self._sleep_on_queue():
for stream, pending in list(self._pending_by_stream.items()):
self._schedule_pending(stream, pending)
# on_shutdown() has been called. Send close() on every sender to give
# targets a chance to shut down gracefully.
LOG.debug('%r._scheduler_main() shutting down', self)
for _, pending in self._pending_by_stream.items():
for sender, fp in pending:
sender.close()
fp.close()
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'path': basestring
})
def register(self, path):
"""
Authorize a path for access by child contexts. Calling this repeatedly
with the same path is harmless.
:param str path:
File path.
"""
if path not in self._size_by_path:
LOG.debug('%r: registering %r', self, path)
self._size_by_path[path] = os.path.getsize(path)
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender):
"""
Fetch a file's data.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
File size. The target can decide whether to keep the file in RAM or
disk based on the return value.
:raises mitogen.core.CallError:
The path was not registered.
"""
if path not in self._size_by_path:
raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
self._queue.put((sender, fp))
return self._size_by_path[path]