ansible: tidy up service.py docstrings.

pull/193/head
David Wilson 7 years ago
parent 79b75aabae
commit 70a735f23a

@ -26,6 +26,17 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
"""
Classes in this file define Mitogen 'services' that run (initially) within the
connection multiplexer process that is forked off the top-level controller
process.
Once a worker process connects to a multiplexer process
(Connection._connect()), it communicates with these services to establish new
connections, grant access to files by children, and register for notification
when a child has completed a job.
"""
from __future__ import absolute_import from __future__ import absolute_import
import logging import logging
import os.path import os.path
@ -46,37 +57,17 @@ class Error(Exception):
class ContextService(mitogen.service.DeduplicatingService): class ContextService(mitogen.service.DeduplicatingService):
""" """
Used by worker processes connecting back into the top-level process to Used by workers to fetch the single Context instance corresponding to a
fetch the single Context instance corresponding to the supplied connection connection configuration, creating the matching connection if it does not
configuration, creating a matching connection if it does not exist. exist.
For connection methods and their parameters, refer to: For connection methods and their parameters, see:
https://mitogen.readthedocs.io/en/latest/api.html#context-factories https://mitogen.readthedocs.io/en/latest/api.html#context-factories
This concentrates all SSH connections in the top-level process, which may This concentrates connections in the top-level process, which may become a
become a bottleneck. There are multiple ways to fix that: bottleneck. The bottleneck can be removed using per-CPU connection
* creating one .local() child context per CPU and sharding connections processes and arranging for the worker to select one according to a hash of
between them, using the master process to route messages, or the connection parameters (sharding).
* as above, but having each child create a unique UNIX listener and
having workers connect in directly.
:param dict dct:
Parameters passed to `mitogen.master.Router.[method]()`.
* The `method` key is popped from the dictionary and used to look up
the Mitogen connection method.
* The `discriminator` key is mixed into the key used to select an
existing connection, but popped from the list of arguments passed to
the connection method.
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the target
context.
* `home_dir` is a cached copy of the remote directory.
mitogen.master.Context:
Corresponding Context instance.
""" """
handle = 500 handle = 500
max_message_size = 1000 max_message_size = 1000
@ -86,6 +77,25 @@ class ContextService(mitogen.service.DeduplicatingService):
'method_name': str 'method_name': str
}) })
def connect(self, method_name, discriminator=None, **kwargs): def connect(self, method_name, discriminator=None, **kwargs):
"""
Return a Context referring to an established connection with the given
configuration, establishing a new connection as necessary.
:param dict dct:
Parameters passed to `mitogen.master.Router.[method]()`.
* The `method` key is popped from the dictionary and used to look
up the Mitogen connection method.
* The `discriminator` key is mixed into the key used to select an
existing connection, but popped from the list of arguments passed
to the connection method.
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the
target context.
* `home_dir` is a cached copy of the remote directory.
"""
method = getattr(self.router, method_name, None) method = getattr(self.router, method_name, None)
if method is None: if method is None:
raise Error('no such Router method: %s' % (method_name,)) raise Error('no such Router method: %s' % (method_name,))
@ -101,7 +111,7 @@ class ContextService(mitogen.service.DeduplicatingService):
home_dir = context.call(os.path.expanduser, '~') home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its # We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch any failures anyway. # return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent) context.call_async(ansible_mitogen.target.start_fork_parent)
return { return {
'context': context, 'context': context,
@ -119,20 +129,6 @@ class FileService(mitogen.service.Service):
Paths must be explicitly added to the service by a trusted context before Paths must be explicitly added to the service by a trusted context before
they will be served to an untrusted context. they will be served to an untrusted context.
:param tuple args:
Tuple of `(cmd, path)`, where:
- cmd: one of "register", "fetch", where:
- register: register a file that may be fetched
- fetch: fetch a file that was previously registered
- path: key of the file to fetch or register
:returns:
Returns ``None` for "register", or the file data for "fetch".
:raises mitogen.core.CallError:
Security violation occurred, either path not registered, or attempt to
register path from unprivileged context.
""" """
handle = 501 handle = 501
max_message_size = 1000 max_message_size = 1000
@ -147,8 +143,15 @@ class FileService(mitogen.service.Service):
'path': basestring 'path': basestring
}) })
def register(self, path): def register(self, path):
"""
Authorize a path for access by child contexts. Calling this repeatedly
with the same path is harmless.
:param str path:
File path.
"""
if path not in self._paths: if path not in self._paths:
LOG.info('%r: registering %r', self, path) LOG.debug('%r: registering %r', self, path)
with open(path, 'rb') as fp: with open(path, 'rb') as fp:
self._paths[path] = zlib.compress(fp.read()) self._paths[path] = zlib.compress(fp.read())
@ -157,6 +160,18 @@ class FileService(mitogen.service.Service):
'path': basestring 'path': basestring
}) })
def fetch(self, path): def fetch(self, path):
"""
Fetch a file's data.
:param str path:
File path.
:returns:
The file data.
:raises mitogen.core.CallError:
The path was not registered.
"""
if path not in self._paths: if path not in self._paths:
raise mitogen.core.CallError(self.unregistered_msg) raise mitogen.core.CallError(self.unregistered_msg)
@ -169,6 +184,16 @@ class JobResultService(mitogen.service.Service):
Receive the result of a task from a child and forward it to interested Receive the result of a task from a child and forward it to interested
listeners. If no listener exists, store the result until it is requested. listeners. If no listener exists, store the result until it is requested.
Storing results in an intermediary service allows:
* the lifetime of the worker to be decoupled from the lifetime of the job,
* for new and unrelated workers to request the job result after the original
worker that spawned it has exitted,
* for synchronous and asynchronous jobs to be treated identically,
* for latency-free polling and waiting on job results, and
* for Ansible job IDs to be be used to refer to a job in preference to
Mitogen-internal identifiers such as Sender and Context.
Results are keyed by job ID. Results are keyed by job ID.
""" """
handle = 502 handle = 502
@ -186,6 +211,14 @@ class JobResultService(mitogen.service.Service):
'sender': mitogen.core.Sender, 'sender': mitogen.core.Sender,
}) })
def listen(self, job_id, sender): def listen(self, job_id, sender):
"""
Register to receive the result of a job when it becomes available.
:param str job_id:
Job ID to listen for.
:param mitogen.core.Sender sender:
Sender on which to deliver the job result.
"""
LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender) LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender)
with self._lock: with self._lock:
if job_id in self._sender_by_job_id: if job_id in self._sender_by_job_id:
@ -197,6 +230,15 @@ class JobResultService(mitogen.service.Service):
'job_id': basestring, 'job_id': basestring,
}) })
def get(self, job_id): def get(self, job_id):
"""
Return a job's result if it is available, otherwise return immediately.
The job result is forgotten once it has been returned by this method.
:param str job_id:
Job ID to return.
:returns:
Job result dictionary, or :data:`None`.
"""
LOG.debug('%r.get(job_id=%r)', self, job_id) LOG.debug('%r.get(job_id=%r)', self, job_id)
with self._lock: with self._lock:
return self._result_by_job_id.pop(job_id, None) return self._result_by_job_id.pop(job_id, None)
@ -207,6 +249,15 @@ class JobResultService(mitogen.service.Service):
'result': dict 'result': dict
}) })
def push(self, job_id, result): def push(self, job_id, result):
"""
Deliver a job's result from a child context, notifying any listener
registred via :meth:`listen` of the result.
:param str job_id:
Job ID whose result is being pushed.
:param dict result:
Job result dictionary.
"""
LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result) LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result)
with self._lock: with self._lock:
if job_id in self._result_by_job_id: if job_id in self._result_by_job_id:

Loading…
Cancel
Save