ansible: use CallChain everywhere.

This replaces the 'dump to logger' behaviour of pipelined calls from
before with a call chain that returns any exception on next synchronized
call.
pull/372/head
David Wilson 6 years ago
parent 020482e554
commit 43d9815f6d

@ -403,6 +403,34 @@ def config_from_hostvars(transport, inventory_name, connection,
}) })
class CallChain(mitogen.parent.CallChain):
call_aborted_msg = (
'Mitogen was disconnected from the remote environment while a call '
'was in-progress. If you feel this is in error, please file a bug. '
'Original error was: %s'
)
def _rethrow(self, recv):
try:
return recv.get().unpickle()
except mitogen.core.ChannelError as e:
raise ansible.errors.AnsibleConnectionFailure(
self.call_aborted_msg % (e,)
)
def call(self, func, *args, **kwargs):
"""
Like :meth:`mitogen.parent.CallChain.call`, but log timings.
"""
t0 = time.time()
try:
recv = self.call_async(func, *args, **kwargs)
return self._rethrow(recv)
finally:
LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0),
mitogen.parent.CallSpec(func, args, kwargs))
class Connection(ansible.plugins.connection.ConnectionBase): class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Broker for this worker. #: mitogen.master.Broker for this worker.
broker = None broker = None
@ -422,17 +450,30 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: account, even when become=True. #: account, even when become=True.
login_context = None login_context = None
#: Only sudo, su, and doas are supported for now.
become_methods = ['sudo', 'su', 'doas']
#: Dict containing init_child() return vaue as recorded at startup by #: Dict containing init_child() return vaue as recorded at startup by
#: ContextService. Contains: #: ContextService. Contains:
#: #:
#: fork_context: Context connected to the fork parent : process in the #: fork_context: Context connected to the fork parent : process in the
#: target account. #: target account.
#: home_dir: Target context's home directory. #: home_dir: Target context's home directory.
#: temp_dir: A writeable temporary directory path. #: temp_dir: A writeable temporary directory managed by the
#: target, automatically destroyed at shutdown.
init_child_result = None init_child_result = None
#: Only sudo, su, and doas are supported for now. #: After :meth:`get_temp_dir` is called, a private temporary directory,
become_methods = ['sudo', 'su', 'doas'] #: destroyed during :meth:`close`, or automatically during shutdown if
#: :meth:`close` failed or was never called.
_temp_dir = None
#: A :class:`mitogen.parent.CallChain` to use for calls made to the target
#: account, to ensure subsequent calls fail if pipelined directory creation
#: or file transfer fails. This eliminates roundtrips when a call is likely
#: to succeed, and ensures subsequent actions will fail with the original
#: exception if the pipelined call failed.
chain = None
# #
# Note: any of the attributes below may be :data:`None` if the connection # Note: any of the attributes below may be :data:`None` if the connection
@ -625,6 +666,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(dct['msg']) raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
self.context = dct['context'] self.context = dct['context']
self.chain = CallChain(self.context, pipelined=True)
if self._play_context.become: if self._play_context.become:
self.login_context = dct['via'] self.login_context = dct['via']
else: else:
@ -633,8 +675,16 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.init_child_result = dct['init_child_result'] self.init_child_result = dct['init_child_result']
def get_temp_dir(self): def get_temp_dir(self):
self._connect() """
return self.init_child_result['temp_dir'] """
if self._temp_dir is None:
self._temp_dir = os.path.join(
self.init_child_result['temp_dir'],
'worker-%d-%x' % (os.getpid(), id(self))
)
self.get_chain().call_no_reply(os.mkdir, self._temp_dir)
return self._temp_dir
def _connect(self): def _connect(self):
""" """
@ -661,6 +711,13 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times. multiple times.
""" """
if self.context: if self.context:
self.chain.reset()
if self._temp_dir:
# Don't pipeline here to ensure exception is dumped into
# logging framework on failure.
self.context.call_no_reply(ansible_mitogen.target.prune_tree,
self._temp_dir)
self._temp_dir = None
self.parent.call_service( self.parent.call_service(
service_name='ansible_mitogen.services.ContextService', service_name='ansible_mitogen.services.ContextService',
method_name='put', method_name='put',
@ -668,78 +725,33 @@ class Connection(ansible.plugins.connection.ConnectionBase):
) )
self.context = None self.context = None
self.login_context = None
self.init_child_result = None self.init_child_result = None
self.chain = None
if self.broker and not new_task: if self.broker and not new_task:
self.broker.shutdown() self.broker.shutdown()
self.broker.join() self.broker.join()
self.broker = None self.broker = None
self.router = None self.router = None
def call_async(self, func, *args, **kwargs): def get_chain(self, use_login=False, use_fork=False):
""" """
Start a function call to the target. Return the :class:`mitogen.parent.CallChain` to use for executing
function calls.
:param bool use_login_context:
If present and :data:`True`, send the call to the login account
context rather than the optional become user context.
:param bool no_reply: :param bool use_login:
If present and :data:`True`, send the call with no ``reply_to`` If :data:`True`, always return the chain for the login account
header, causing the context to execute it entirely asynchronously, rather than any active become user.
and to log any exception thrown. This allows avoiding a roundtrip :param bool use_fork:
in places where the outcome of a call is highly likely to succeed, If :data:`True`, return the chain for the fork parent.
and subsequent actions will fail regardless with a meaningful :returns mitogen.parent.CallChain:
exception if the no_reply call failed.
:returns:
:class:`mitogen.core.Receiver` that receives the function call result.
""" """
self._connect() self._connect()
if use_login:
if kwargs.pop('use_login_context', None): return self.login_context.default_call_chain
call_context = self.login_context if use_fork:
elif kwargs.pop('use_fork_context', None): return self.init_child_result['fork_context'].default_call_chain
call_context = self.init_child_result['fork_context'] return self.chain
else:
call_context = self.context
if kwargs.pop('no_reply', None):
return call_context.call_no_reply(func, *args, **kwargs)
else:
return call_context.call_async(func, *args, **kwargs)
call_aborted_msg = (
'Mitogen was disconnected from the remote environment while a call '
'was in-progress. If you feel this is in error, please file a bug. '
'Original error was: %s'
)
def _call_rethrow(self, recv):
try:
return recv.get().unpickle()
except mitogen.core.ChannelError as e:
raise ansible.errors.AnsibleConnectionFailure(
self.call_aborted_msg % (e,)
)
def call(self, func, *args, **kwargs):
"""
Start and wait for completion of a function call in the target.
:raises mitogen.core.CallError:
The function call failed.
:returns:
Function return value.
"""
t0 = time.time()
try:
recv = self.call_async(func, *args, **kwargs)
if recv is None: # no_reply=True
return None
return self._call_rethrow(recv)
finally:
LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0),
mitogen.parent.CallSpec(func, args, kwargs))
def create_fork_child(self): def create_fork_child(self):
""" """
@ -750,8 +762,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:returns: :returns:
mitogen.core.Context of the new child. mitogen.core.Context of the new child.
""" """
return self.call(ansible_mitogen.target.create_fork_child, return self.get_chain(use_fork=True).call(
use_fork_context=True) ansible_mitogen.target.create_fork_child
)
def get_default_cwd(self): def get_default_cwd(self):
""" """
@ -804,35 +817,33 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:param str out_path: :param str out_path:
Local filesystem path to write. Local filesystem path to write.
""" """
output = self.call(ansible_mitogen.target.read_path, output = self.get_chain().call(
mitogen.utils.cast(in_path)) ansible_mitogen.target.read_path,
mitogen.utils.cast(in_path),
)
ansible_mitogen.target.write_path(out_path, output) ansible_mitogen.target.write_path(out_path, output)
def put_data(self, out_path, data, mode=None, utimes=None): def put_data(self, out_path, data, mode=None, utimes=None):
""" """
Implement put_file() by caling the corresponding ansible_mitogen.target Implement put_file() by caling the corresponding ansible_mitogen.target
function in the target, transferring small files inline. function in the target, transferring small files inline. This is
pipelined and will return immediately; failed transfers are reported as
exceptions in subsequent functon calls.
:param str out_path: :param str out_path:
Remote filesystem path to write. Remote filesystem path to write.
:param byte data: :param byte data:
File contents to put. File contents to put.
""" """
# no_reply=True here avoids a roundrip that 99% of the time will report self.get_chain().call_no_reply(
# a successful response. If the file transfer fails, the target context ansible_mitogen.target.write_path,
# will dump an exception into the logging framework, which will appear
# on console, and the missing file will cause the subsequent task step
# to fail regardless. This is safe since CALL_FUNCTION is presently
# single-threaded for each target, so subsequent steps cannot execute
# until the transfer RPC has completed.
self.call(ansible_mitogen.target.write_path,
mitogen.utils.cast(out_path), mitogen.utils.cast(out_path),
mitogen.core.Blob(data), mitogen.core.Blob(data),
mode=mode, mode=mode,
utimes=utimes, utimes=utimes,
no_reply=True) )
#: Maximum size of a small file before switching to streaming file #: Maximum size of a small file before switching to streaming
#: transfer. This should really be the same as #: transfer. This should really be the same as
#: mitogen.services.FileService.IO_SIZE, however the message format has #: mitogen.services.FileService.IO_SIZE, however the message format has
#: slightly more overhead, so just randomly subtract 4KiB. #: slightly more overhead, so just randomly subtract 4KiB.

@ -115,15 +115,6 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
) )
return super(ActionModuleMixin, self).run(tmp, task_vars) return super(ActionModuleMixin, self).run(tmp, task_vars)
def call(self, func, *args, **kwargs):
"""
Arrange for a Python function to be called in the target context, which
should be some function from the standard library or
ansible_mitogen.target module. This junction point exists mainly as a
nice place to insert print statements during debugging.
"""
return self._connection.call(func, *args, **kwargs)
COMMAND_RESULT = { COMMAND_RESULT = {
'rc': 0, 'rc': 0,
'stdout': '', 'stdout': '',
@ -164,7 +155,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
target user account. target user account.
""" """
LOG.debug('_remote_file_exists(%r)', path) LOG.debug('_remote_file_exists(%r)', path)
return self.call(os.path.exists, mitogen.utils.cast(path)) return self._connection.get_chain().call(
os.path.exists,
mitogen.utils.cast(path)
)
def _configure_module(self, module_name, module_args, task_vars=None): def _configure_module(self, module_name, module_args, task_vars=None):
""" """
@ -241,7 +235,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)', LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)',
paths, mode, sudoable) paths, mode, sudoable)
return self.fake_shell(lambda: mitogen.select.Select.all( return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async( self._connection.get_chain().call_async(
ansible_mitogen.target.set_file_mode, path, mode ansible_mitogen.target.set_file_mode, path, mode
) )
for path in paths for path in paths
@ -254,9 +248,9 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
""" """
LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)', LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)',
paths, user, sudoable) paths, user, sudoable)
ent = self.call(pwd.getpwnam, user) ent = self._connection.get_chain().call(pwd.getpwnam, user)
return self.fake_shell(lambda: mitogen.select.Select.all( return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async( self._connection.get_chain().call_async(
os.chown, path, ent.pw_uid, ent.pw_gid os.chown, path, ent.pw_uid, ent.pw_gid
) )
for path in paths for path in paths
@ -284,8 +278,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# ~/.ansible -> /home/dmw/.ansible # ~/.ansible -> /home/dmw/.ansible
return os.path.join(self._connection.homedir, path[2:]) return os.path.join(self._connection.homedir, path[2:])
# ~root/.ansible -> /root/.ansible # ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, mitogen.utils.cast(path), return self._connection.get_chain(login=(not sudoable)).call(
use_login_context=not sudoable) os.path.expanduser,
mitogen.utils.cast(path),
)
def get_task_timeout_secs(self): def get_task_timeout_secs(self):
""" """

@ -478,7 +478,7 @@ def invoke(invocation):
response = _invoke_forked_task(invocation, planner) response = _invoke_forked_task(invocation, planner)
else: else:
_propagate_deps(invocation, planner, invocation.connection.context) _propagate_deps(invocation, planner, invocation.connection.context)
response = invocation.connection.call( response = invocation.connection.get_chain().call(
ansible_mitogen.target.run_module, ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(), kwargs=planner.get_kwargs(),
) )

@ -1201,6 +1201,9 @@ class CallChain(object):
int(1e6 * time.time()), int(1e6 * time.time()),
) )
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.context)
def __enter__(self): def __enter__(self):
return self return self
@ -1247,8 +1250,7 @@ class CallChain(object):
:raises mitogen.core.CallError: :raises mitogen.core.CallError:
An exception was raised in the remote context during execution. An exception was raised in the remote context during execution.
""" """
LOG.debug('%r.call_no_reply(%r, *%r, **%r)', LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
self, fn, args, kwargs)
self.context.send(self.make_msg(fn, *args, **kwargs)) self.context.send(self.make_msg(fn, *args, **kwargs))
def call_async(self, fn, *args, **kwargs): def call_async(self, fn, *args, **kwargs):

Loading…
Cancel
Save