diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 2b4ed61b..52a61b77 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -403,6 +403,34 @@ def config_from_hostvars(transport, inventory_name, connection, }) +class CallChain(mitogen.parent.CallChain): + call_aborted_msg = ( + 'Mitogen was disconnected from the remote environment while a call ' + 'was in-progress. If you feel this is in error, please file a bug. ' + 'Original error was: %s' + ) + + def _rethrow(self, recv): + try: + return recv.get().unpickle() + except mitogen.core.ChannelError as e: + raise ansible.errors.AnsibleConnectionFailure( + self.call_aborted_msg % (e,) + ) + + def call(self, func, *args, **kwargs): + """ + Like :meth:`mitogen.parent.CallChain.call`, but log timings. + """ + t0 = time.time() + try: + recv = self.call_async(func, *args, **kwargs) + return self._rethrow(recv) + finally: + LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0), + mitogen.parent.CallSpec(func, args, kwargs)) + + class Connection(ansible.plugins.connection.ConnectionBase): #: mitogen.master.Broker for this worker. broker = None @@ -422,17 +450,30 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: account, even when become=True. login_context = None + #: Only sudo, su, and doas are supported for now. + become_methods = ['sudo', 'su', 'doas'] + #: Dict containing init_child() return vaue as recorded at startup by #: ContextService. Contains: #: #: fork_context: Context connected to the fork parent : process in the #: target account. #: home_dir: Target context's home directory. - #: temp_dir: A writeable temporary directory path. + #: temp_dir: A writeable temporary directory managed by the + #: target, automatically destroyed at shutdown. init_child_result = None - #: Only sudo, su, and doas are supported for now. - become_methods = ['sudo', 'su', 'doas'] + #: After :meth:`get_temp_dir` is called, a private temporary directory, + #: destroyed during :meth:`close`, or automatically during shutdown if + #: :meth:`close` failed or was never called. + _temp_dir = None + + #: A :class:`mitogen.parent.CallChain` to use for calls made to the target + #: account, to ensure subsequent calls fail if pipelined directory creation + #: or file transfer fails. This eliminates roundtrips when a call is likely + #: to succeed, and ensures subsequent actions will fail with the original + #: exception if the pipelined call failed. + chain = None # # Note: any of the attributes below may be :data:`None` if the connection @@ -625,6 +666,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure(dct['msg']) self.context = dct['context'] + self.chain = CallChain(self.context, pipelined=True) if self._play_context.become: self.login_context = dct['via'] else: @@ -633,8 +675,16 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.init_child_result = dct['init_child_result'] def get_temp_dir(self): - self._connect() - return self.init_child_result['temp_dir'] + """ + """ + if self._temp_dir is None: + self._temp_dir = os.path.join( + self.init_child_result['temp_dir'], + 'worker-%d-%x' % (os.getpid(), id(self)) + ) + self.get_chain().call_no_reply(os.mkdir, self._temp_dir) + + return self._temp_dir def _connect(self): """ @@ -661,6 +711,13 @@ class Connection(ansible.plugins.connection.ConnectionBase): multiple times. """ if self.context: + self.chain.reset() + if self._temp_dir: + # Don't pipeline here to ensure exception is dumped into + # logging framework on failure. + self.context.call_no_reply(ansible_mitogen.target.prune_tree, + self._temp_dir) + self._temp_dir = None self.parent.call_service( service_name='ansible_mitogen.services.ContextService', method_name='put', @@ -668,78 +725,33 @@ class Connection(ansible.plugins.connection.ConnectionBase): ) self.context = None + self.login_context = None self.init_child_result = None + self.chain = None if self.broker and not new_task: self.broker.shutdown() self.broker.join() self.broker = None self.router = None - def call_async(self, func, *args, **kwargs): + def get_chain(self, use_login=False, use_fork=False): """ - Start a function call to the target. - - :param bool use_login_context: - If present and :data:`True`, send the call to the login account - context rather than the optional become user context. + Return the :class:`mitogen.parent.CallChain` to use for executing + function calls. - :param bool no_reply: - If present and :data:`True`, send the call with no ``reply_to`` - header, causing the context to execute it entirely asynchronously, - and to log any exception thrown. This allows avoiding a roundtrip - in places where the outcome of a call is highly likely to succeed, - and subsequent actions will fail regardless with a meaningful - exception if the no_reply call failed. - - :returns: - :class:`mitogen.core.Receiver` that receives the function call result. + :param bool use_login: + If :data:`True`, always return the chain for the login account + rather than any active become user. + :param bool use_fork: + If :data:`True`, return the chain for the fork parent. + :returns mitogen.parent.CallChain: """ self._connect() - - if kwargs.pop('use_login_context', None): - call_context = self.login_context - elif kwargs.pop('use_fork_context', None): - call_context = self.init_child_result['fork_context'] - else: - call_context = self.context - - if kwargs.pop('no_reply', None): - return call_context.call_no_reply(func, *args, **kwargs) - else: - return call_context.call_async(func, *args, **kwargs) - - call_aborted_msg = ( - 'Mitogen was disconnected from the remote environment while a call ' - 'was in-progress. If you feel this is in error, please file a bug. ' - 'Original error was: %s' - ) - - def _call_rethrow(self, recv): - try: - return recv.get().unpickle() - except mitogen.core.ChannelError as e: - raise ansible.errors.AnsibleConnectionFailure( - self.call_aborted_msg % (e,) - ) - - def call(self, func, *args, **kwargs): - """ - Start and wait for completion of a function call in the target. - - :raises mitogen.core.CallError: - The function call failed. - :returns: - Function return value. - """ - t0 = time.time() - try: - recv = self.call_async(func, *args, **kwargs) - if recv is None: # no_reply=True - return None - return self._call_rethrow(recv) - finally: - LOG.debug('Call took %d ms: %r', 1000 * (time.time() - t0), - mitogen.parent.CallSpec(func, args, kwargs)) + if use_login: + return self.login_context.default_call_chain + if use_fork: + return self.init_child_result['fork_context'].default_call_chain + return self.chain def create_fork_child(self): """ @@ -750,8 +762,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): :returns: mitogen.core.Context of the new child. """ - return self.call(ansible_mitogen.target.create_fork_child, - use_fork_context=True) + return self.get_chain(use_fork=True).call( + ansible_mitogen.target.create_fork_child + ) def get_default_cwd(self): """ @@ -804,35 +817,33 @@ class Connection(ansible.plugins.connection.ConnectionBase): :param str out_path: Local filesystem path to write. """ - output = self.call(ansible_mitogen.target.read_path, - mitogen.utils.cast(in_path)) + output = self.get_chain().call( + ansible_mitogen.target.read_path, + mitogen.utils.cast(in_path), + ) ansible_mitogen.target.write_path(out_path, output) def put_data(self, out_path, data, mode=None, utimes=None): """ Implement put_file() by caling the corresponding ansible_mitogen.target - function in the target, transferring small files inline. + function in the target, transferring small files inline. This is + pipelined and will return immediately; failed transfers are reported as + exceptions in subsequent functon calls. :param str out_path: Remote filesystem path to write. :param byte data: File contents to put. """ - # no_reply=True here avoids a roundrip that 99% of the time will report - # a successful response. If the file transfer fails, the target context - # will dump an exception into the logging framework, which will appear - # on console, and the missing file will cause the subsequent task step - # to fail regardless. This is safe since CALL_FUNCTION is presently - # single-threaded for each target, so subsequent steps cannot execute - # until the transfer RPC has completed. - self.call(ansible_mitogen.target.write_path, - mitogen.utils.cast(out_path), - mitogen.core.Blob(data), - mode=mode, - utimes=utimes, - no_reply=True) - - #: Maximum size of a small file before switching to streaming file + self.get_chain().call_no_reply( + ansible_mitogen.target.write_path, + mitogen.utils.cast(out_path), + mitogen.core.Blob(data), + mode=mode, + utimes=utimes, + ) + + #: Maximum size of a small file before switching to streaming #: transfer. This should really be the same as #: mitogen.services.FileService.IO_SIZE, however the message format has #: slightly more overhead, so just randomly subtract 4KiB. diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index 7be5444f..6ab8fec7 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -115,15 +115,6 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): ) return super(ActionModuleMixin, self).run(tmp, task_vars) - def call(self, func, *args, **kwargs): - """ - Arrange for a Python function to be called in the target context, which - should be some function from the standard library or - ansible_mitogen.target module. This junction point exists mainly as a - nice place to insert print statements during debugging. - """ - return self._connection.call(func, *args, **kwargs) - COMMAND_RESULT = { 'rc': 0, 'stdout': '', @@ -164,7 +155,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): target user account. """ LOG.debug('_remote_file_exists(%r)', path) - return self.call(os.path.exists, mitogen.utils.cast(path)) + return self._connection.get_chain().call( + os.path.exists, + mitogen.utils.cast(path) + ) def _configure_module(self, module_name, module_args, task_vars=None): """ @@ -241,7 +235,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)', paths, mode, sudoable) return self.fake_shell(lambda: mitogen.select.Select.all( - self._connection.call_async( + self._connection.get_chain().call_async( ansible_mitogen.target.set_file_mode, path, mode ) for path in paths @@ -254,9 +248,9 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): """ LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)', paths, user, sudoable) - ent = self.call(pwd.getpwnam, user) + ent = self._connection.get_chain().call(pwd.getpwnam, user) return self.fake_shell(lambda: mitogen.select.Select.all( - self._connection.call_async( + self._connection.get_chain().call_async( os.chown, path, ent.pw_uid, ent.pw_gid ) for path in paths @@ -284,8 +278,10 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): # ~/.ansible -> /home/dmw/.ansible return os.path.join(self._connection.homedir, path[2:]) # ~root/.ansible -> /root/.ansible - return self.call(os.path.expanduser, mitogen.utils.cast(path), - use_login_context=not sudoable) + return self._connection.get_chain(login=(not sudoable)).call( + os.path.expanduser, + mitogen.utils.cast(path), + ) def get_task_timeout_secs(self): """ diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 8ebf4f67..c709bf7d 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -478,7 +478,7 @@ def invoke(invocation): response = _invoke_forked_task(invocation, planner) else: _propagate_deps(invocation, planner, invocation.connection.context) - response = invocation.connection.call( + response = invocation.connection.get_chain().call( ansible_mitogen.target.run_module, kwargs=planner.get_kwargs(), ) diff --git a/mitogen/parent.py b/mitogen/parent.py index 556afb22..bb0eb0fb 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1201,6 +1201,9 @@ class CallChain(object): int(1e6 * time.time()), ) + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, self.context) + def __enter__(self): return self @@ -1247,8 +1250,7 @@ class CallChain(object): :raises mitogen.core.CallError: An exception was raised in the remote context during execution. """ - LOG.debug('%r.call_no_reply(%r, *%r, **%r)', - self, fn, args, kwargs) + LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs)) self.context.send(self.make_msg(fn, *args, **kwargs)) def call_async(self, fn, *args, **kwargs):