You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/ansible_mitogen/target.py

529 lines
16 KiB
Python

# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
"""
Helper functions intended to be executed on the target. These are entrypoints
for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
import cStringIO
import grp
import json
import logging
import operator
import os
import pwd
import random
import re
import stat
7 years ago
import subprocess
import tempfile
import time
import traceback
import ansible.module_utils.json_utils
import ansible_mitogen.runner
import ansible_mitogen.services
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
import mitogen.core
import mitogen.fork
import mitogen.parent
import mitogen.service
LOG = logging.getLogger(__name__)
#: Caching of fetched file data.
_file_cache = {}
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
#: Initialized to an econtext.parent.Context pointing at a pristine fork of
#: the target Python interpreter before it executes any code or imports.
_fork_parent = None
def _get_file(context, path, out_fp):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = mitogen.service.call(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path,
'sender': recv.to_sender()
}
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
mitogen.service.call_async(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='acknowledge',
kwargs={
'size': len(s),
}
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']
if not ok:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0))
return ok, metadata
def get_file(context, path):
"""
Basic in-memory caching module fetcher. This generates an one roundtrip for
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
every previously unseen file, so it is only a temporary solution.
:param context:
Context we should direct FileService requests to. For now (and probably
forever) this is just the top-level Mitogen connection manager process.
:param path:
Path to fetch from FileService, must previously have been registered by
a privileged context using the `register` command.
:returns:
Bytestring file data.
"""
if path not in _file_cache:
io = cStringIO.StringIO()
ok, metadata = _get_file(context, path, io)
if not ok:
raise IOError('transfer of %r was interrupted.' % (path,))
_file_cache[path] = io.getvalue()
return _file_cache[path]
def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:param bool sync:
If :data:`True`, ensure the file content and metadat are fully on disk
before renaming the temporary file over the existing file. This should
ensure in the case of system crash, either the entire old or new file
are visible post-reboot.
:param bool set_owner:
If :data:`True`, look up the metadata username and group on the local
system and file the file owner using :func:`os.fchmod`.
"""
out_path = os.path.abspath(out_path)
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
prefix='.ansible_mitogen_transfer-',
dir=os.path.dirname(out_path))
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
LOG.debug('transfer_file(%r) temporary file: %s', out_path, tmp_path)
try:
try:
ok, metadata = _get_file(context, in_path, fp)
if not ok:
raise IOError('transfer of %r was interrupted.' % (in_path,))
os.fchmod(fp.fileno(), metadata['mode'])
if set_owner:
set_fd_owner(fp.fileno(), metadata['owner'], metadata['group'])
finally:
fp.close()
if sync:
os.fsync(fp.fileno())
os.rename(tmp_path, out_path)
except BaseException:
os.unlink(tmp_path)
raise
os.utime(out_path, (metadata['atime'], metadata['mtime']))
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
@mitogen.core.takes_econtext
def start_fork_parent(econtext):
"""
Called by ContextService immediately after connection; arranges for the
(presently) spotless Python interpreter to be forked, where the newly
forked interpreter becomes the parent of any newly forked future
interpreters.
This is necessary to prevent modules that are executed in-process from
polluting the global interpreter state in a way that effects explicitly
isolated modules.
"""
global _fork_parent
mitogen.parent.upgrade_router(econtext)
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
_fork_parent = econtext.router.fork()
@mitogen.core.takes_econtext
def start_fork_child(wrap_async, kwargs, econtext):
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork()
if not wrap_async:
try:
return context.call(run_module, kwargs)
finally:
context.shutdown()
job_id = '%016x' % random.randint(0, 2**64)
context.call_async(run_module_async, job_id, kwargs)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
@mitogen.core.takes_econtext
def run_module(kwargs, econtext):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to prevent it
from reading sys.stdin.
"""
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
should_fork = kwargs.pop('should_fork', False)
wrap_async = kwargs.pop('wrap_async', False)
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
if should_fork:
return _fork_parent.call(start_fork_child, wrap_async, kwargs)
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
runner_name = kwargs.pop('runner_name')
klass = getattr(ansible_mitogen.runner, runner_name)
impl = klass(**kwargs)
return impl.run()
def _get_async_dir():
return os.path.expanduser(
os.environ.get('ANSIBLE_ASYNC_DIR', '~/.ansible_async')
)
def _write_job_status(job_id, dct):
"""
Update an async job status file.
"""
LOG.info('_write_job_status(%r, %r)', job_id, dct)
dct.setdefault('ansible_job_id', job_id)
dct.setdefault('data', '')
async_dir = _get_async_dir()
if not os.path.exists(async_dir):
os.makedirs(async_dir)
path = os.path.join(async_dir, job_id)
with open(path + '.tmp', 'w') as fp:
fp.write(json.dumps(dct))
os.rename(path + '.tmp', path)
def _run_module_async(job_id, kwargs, econtext):
"""
Body on run_module_async().
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
"""
_write_job_status(job_id, {
'started': 1,
'finished': 0
})
kwargs['emulate_tty'] = False
dct = run_module(kwargs, econtext)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
try:
filtered, warnings = (
ansible.module_utils.json_utils.
_filter_non_json_lines(dct['stdout'])
)
result = json.loads(filtered)
result.setdefault('warnings', []).extend(warnings)
result['stderr'] = dct['stderr']
_write_job_status(job_id, result)
except Exception:
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
"data": dct['stdout'], # temporary notice only
"stderr": dct['stderr']
})
@mitogen.core.takes_econtext
def run_module_async(job_id, kwargs, econtext):
"""
Since run_module_async() is invoked with .call_async(), with nothing to
read the result from the corresponding Receiver, wrap the body in an
exception logger, and wrap that in something that tears down the context on
completion.
"""
try:
try:
_run_module_async(job_id, kwargs, econtext)
except Exception:
LOG.exception('_run_module_async crashed')
finally:
ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async.
7 years ago
econtext.broker.shutdown()
def make_temp_directory(base_dir):
"""
Handle creation of `base_dir` if it is absent, in addition to a unique
temporary directory within `base_dir`.
:returns:
Newly created temporary directory.
"""
if not os.path.exists(base_dir):
os.makedirs(base_dir, mode=int('0700', 8))
return tempfile.mkdtemp(
dir=base_dir,
prefix='ansible-mitogen-tmp-',
)
def get_user_shell():
"""
For commands executed directly via an SSH command-line, SSH looks up the
user's shell via getpwuid() and only defaults to /bin/sh if that field is
missing or empty.
"""
try:
pw_shell = pwd.getpwuid(os.geteuid()).pw_shell
except KeyError:
pw_shell = None
return pw_shell or '/bin/sh'
def exec_args(args, in_data='', chdir=None, shell=None, emulate_tty=False):
"""
Run a command in a subprocess, emulating the argument handling behaviour of
SSH.
:param list[str]:
Argument vector.
:param bytes in_data:
Optional standard input for the command.
:param bool emulate_tty:
If :data:`True`, arrange for stdout and stderr to be merged into the
stdout pipe and for LF to be translated into CRLF, emulating the
behaviour of a TTY.
:return:
(return code, stdout bytes, stderr bytes)
"""
LOG.debug('exec_args(%r, ..., chdir=%r)', args, chdir)
assert isinstance(args, list)
if emulate_tty:
stderr = subprocess.STDOUT
else:
stderr = subprocess.PIPE
proc = subprocess.Popen(
args=args,
7 years ago
stdout=subprocess.PIPE,
stderr=stderr,
7 years ago
stdin=subprocess.PIPE,
cwd=chdir,
)
7 years ago
stdout, stderr = proc.communicate(in_data)
if emulate_tty:
stdout = stdout.replace('\n', '\r\n')
return proc.returncode, stdout, stderr or ''
7 years ago
def exec_command(cmd, in_data='', chdir=None, shell=None, emulate_tty=False):
"""
Run a command in a subprocess, emulating the argument handling behaviour of
SSH.
:param bytes cmd:
String command line, passed to user's shell.
:param bytes in_data:
Optional standard input for the command.
:return:
(return code, stdout bytes, stderr bytes)
"""
assert isinstance(cmd, basestring)
return exec_args(
args=[get_user_shell(), '-c', cmd],
in_data=in_data,
chdir=chdir,
shell=shell,
emulate_tty=emulate_tty,
)
7 years ago
def read_path(path):
"""
Fetch the contents of a filesystem `path` as bytes.
"""
return open(path, 'rb').read()
7 years ago
def set_fd_owner(fd, owner, group=None):
if owner:
uid = pwd.getpwnam(owner).pw_uid
else:
uid = os.geteuid()
if group:
gid = grp.getgrnam(group).gr_gid
else:
gid = os.getegid()
os.fchown(fd, (uid, gid))
def write_path(path, s, owner=None, group=None, mode=None,
utimes=None, sync=False):
"""
Writes bytes `s` to a filesystem `path`.
"""
path = os.path.abspath(path)
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
prefix='.ansible_mitogen_transfer-',
dir=os.path.dirname(path))
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path)
try:
try:
if mode:
os.fchmod(fp.fileno(), mode)
if owner or group:
set_fd_owner(fp.fileno(), owner, group)
fp.write(s)
finally:
fp.close()
if sync:
os.fsync(fp.fileno())
os.rename(tmp_path, path)
except BaseException:
os.unlink(tmp_path)
raise
if utimes:
os.utime(path, utimes)
CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')
CHMOD_MASKS = {
'u': stat.S_IRWXU,
'g': stat.S_IRWXG,
'o': stat.S_IRWXO,
'a': (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO),
}
CHMOD_BITS = {
'u': {'r': stat.S_IRUSR, 'w': stat.S_IWUSR, 'x': stat.S_IXUSR},
'g': {'r': stat.S_IRGRP, 'w': stat.S_IWGRP, 'x': stat.S_IXGRP},
'o': {'r': stat.S_IROTH, 'w': stat.S_IWOTH, 'x': stat.S_IXOTH},
'a': {
'r': (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH),
'w': (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH),
'x': (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
}
}
def apply_mode_spec(spec, mode):
"""
Given a symbolic file mode change specification in the style of chmod(1)
`spec`, apply changes in the specification to the numeric file mode `mode`.
"""
for clause in spec.split(','):
match = CHMOD_CLAUSE_PAT.match(clause)
who, op, perms = match.groups()
for ch in who or 'a':
mask = CHMOD_MASKS[ch]
bits = CHMOD_BITS[ch]
cur_perm_bits = mode & mask
new_perm_bits = reduce(operator.or_, (bits[p] for p in perms), 0)
mode &= ~mask
if op == '=':
mode |= new_perm_bits
elif op == '+':
mode |= new_perm_bits | cur_perm_bits
else:
mode |= cur_perm_bits & ~new_perm_bits
return mode
def set_file_mode(path, spec):
"""
Update the permissions of a file using the same syntax as chmod(1).
"""
mode = os.stat(path).st_mode
if spec.isdigit():
new_mode = int(spec, 8)
else:
new_mode = apply_mode_spec(spec, mode)
os.chmod(path, new_mode)