From 711aed7a4c3e49921d5f83f33b9f74f5f4237f46 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 11:02:02 +0000 Subject: [PATCH 01/35] core: split _broker_shutdown() out into its own function. Makes _broker_main() logic much clearer. --- mitogen/core.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 83880621..6cb311d3 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2330,28 +2330,31 @@ class Broker(object): for (side, func) in self.poller.poll(timeout): self._call(side.stream, func) + def _broker_shutdown(self): + for _, (side, _) in self.poller.readers + self.poller.writers: + self._call(side.stream, side.stream.on_shutdown) + + deadline = time.time() + self.shutdown_timeout + while self.keep_alive() and time.time() < deadline: + self._loop_once(max(0, deadline - time.time())) + + if self.keep_alive(): + LOG.error('%r: some streams did not close gracefully. ' + 'The most likely cause for this is one or ' + 'more child processes still connected to ' + 'our stdout/stderr pipes.', self) + + for _, (side, _) in self.poller.readers + self.poller.writers: + LOG.error('_broker_main() force disconnecting %r', side) + side.stream.on_disconnect(self) + def _broker_main(self): try: while self._alive: self._loop_once() fire(self, 'shutdown') - for _, (side, _) in self.poller.readers + self.poller.writers: - self._call(side.stream, side.stream.on_shutdown) - - deadline = time.time() + self.shutdown_timeout - while self.keep_alive() and time.time() < deadline: - self._loop_once(max(0, deadline - time.time())) - - if self.keep_alive(): - LOG.error('%r: some streams did not close gracefully. ' - 'The most likely cause for this is one or ' - 'more child processes still connected to ' - 'our stdout/stderr pipes.', self) - - for _, (side, _) in self.poller.readers + self.poller.writers: - LOG.error('_broker_main() force disconnecting %r', side) - side.stream.on_disconnect(self) + self._broker_shutdown() except Exception: LOG.exception('_broker_main() crashed') From 804bacdadb14d60cf26808a64d0c6a70230ca643 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 12:56:13 +0000 Subject: [PATCH 02/35] docs: move most remaining docstrings back into *.py; closes #388 The remaining ones are decorators which don't seem to have an autodoc equivlent. --- docs/api.rst | 996 +++++++++++++++++++-------------------------- docs/changelog.rst | 2 +- docs/internals.rst | 196 +-------- mitogen/core.py | 254 ++++++++++-- mitogen/master.py | 55 +++ mitogen/parent.py | 14 + 6 files changed, 725 insertions(+), 792 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 844bb900..57b9a655 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -98,479 +98,441 @@ Router Class .. currentmodule:: mitogen.master -.. class:: Router (broker=None) +.. autoclass:: Router (broker=None) + :members: - Extend :class:`mitogen.core.Router` with functionality useful to - masters, and child contexts who later become masters. Currently when this - class is required, the target context's router is upgraded at runtime. - .. note:: +.. _context-factories: + +Connection Methods +================== - You may construct as many routers as desired, and use the same broker - for multiple routers, however usually only one broker and router need - exist. Multiple routers may be useful when dealing with separate trust - domains, for example, manipulating infrastructure belonging to separate - customers or projects. +.. method:: mitogen.parent.Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - :param mitogen.master.Broker broker: - :class:`Broker` instance to use. If not specified, a private - :class:`Broker` is created. + Construct a context on the local machine by forking the current + process. The forked child receives a new identity, sets up a new broker + and router, and responds to function calls identically to children + created using other methods. - .. attribute:: profiling + For long-lived processes, :meth:`local` is always better as it + guarantees a pristine interpreter state that inherited little from the + parent. Forking should only be used in performance-sensitive scenarios + where short-lived children must be spawned to isolate potentially buggy + code, and only after accounting for all the bad things possible as a + result of, at a minimum: - When :data:`True`, cause the broker thread and any subsequent broker - and main threads existing in any child to write - ``/tmp/mitogen.stats...log`` containing a - :mod:`cProfile` dump on graceful exit. Must be set prior to - construction of any :class:`Broker`, e.g. via: + * Files open in the parent remaining open in the child, + causing the lifetime of the underlying object to be extended + indefinitely. - .. code:: + * From the perspective of external components, this is observable + in the form of pipes and sockets that are never closed, which may + break anything relying on closure to signal protocol termination. - mitogen.master.Router.profiling = True + * Descriptors that reference temporary files will not have their disk + space reclaimed until the child exits. - .. method:: enable_debug + * Third party package state, such as urllib3's HTTP connection pool, + attempting to write to file descriptors shared with the parent, + causing random failures in both parent and child. - Cause this context and any descendant child contexts to write debug - logs to /tmp/mitogen..log. + * UNIX signal handlers installed in the parent process remaining active + in the child, despite associated resources, such as service threads, + child processes, resource usage counters or process timers becoming + absent or reset in the child. - .. method:: allocate_id + * Library code that makes assumptions about the process ID remaining + unchanged, for example to implement inter-process locking, or to + generate file names. - Arrange for a unique context ID to be allocated and associated with a - route leading to the active context. In masters, the ID is generated - directly, in children it is forwarded to the master via an - ``ALLOCATE_ID`` message that causes the master to emit matching - ``ADD_ROUTE`` messages prior to replying. + * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement + doubles as either parent or child dirties their pages. + + * File-backed memory mappings that cannot have their space freed on + disk due to the mapping living on in the child. + + * Difficult to diagnose memory usage and latency spikes due to object + graphs becoming unreferenced in either parent or child, causing + immediate copy-on-write to large portions of the process heap. + + * Locks held in the parent causing random deadlocks in the child, such + as when another thread emits a log entry via the :mod:`logging` + package concurrent to another thread calling :meth:`fork`. + + * Objects existing in Thread-Local Storage of every non-:meth:`fork` + thread becoming permanently inaccessible, and never having their + object destructors called, including TLS usage by native extension + code, triggering many new variants of all the issues above. - .. _context-factories: + * Pseudo-Random Number Generator state that is easily observable by + network peers to be duplicate, violating requirements of + cryptographic protocols through one-time state reuse. In the worst + case, children continually reuse the same state due to repeatedly + forking from a static parent. - **Context Factories** - - .. method:: fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - - Construct a context on the local machine by forking the current - process. The forked child receives a new identity, sets up a new broker - and router, and responds to function calls identically to children - created using other methods. - - For long-lived processes, :meth:`local` is always better as it - guarantees a pristine interpreter state that inherited little from the - parent. Forking should only be used in performance-sensitive scenarios - where short-lived children must be spawned to isolate potentially buggy - code, and only after accounting for all the bad things possible as a - result of, at a minimum: - - * Files open in the parent remaining open in the child, - causing the lifetime of the underlying object to be extended - indefinitely. - - * From the perspective of external components, this is observable - in the form of pipes and sockets that are never closed, which may - break anything relying on closure to signal protocol termination. - - * Descriptors that reference temporary files will not have their disk - space reclaimed until the child exits. - - * Third party package state, such as urllib3's HTTP connection pool, - attempting to write to file descriptors shared with the parent, - causing random failures in both parent and child. - - * UNIX signal handlers installed in the parent process remaining active - in the child, despite associated resources, such as service threads, - child processes, resource usage counters or process timers becoming - absent or reset in the child. - - * Library code that makes assumptions about the process ID remaining - unchanged, for example to implement inter-process locking, or to - generate file names. - - * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement - doubles as either parent or child dirties their pages. - - * File-backed memory mappings that cannot have their space freed on - disk due to the mapping living on in the child. - - * Difficult to diagnose memory usage and latency spikes due to object - graphs becoming unreferenced in either parent or child, causing - immediate copy-on-write to large portions of the process heap. - - * Locks held in the parent causing random deadlocks in the child, such - as when another thread emits a log entry via the :mod:`logging` - package concurrent to another thread calling :meth:`fork`. - - * Objects existing in Thread-Local Storage of every non-:meth:`fork` - thread becoming permanently inaccessible, and never having their - object destructors called, including TLS usage by native extension - code, triggering many new variants of all the issues above. - - * Pseudo-Random Number Generator state that is easily observable by - network peers to be duplicate, violating requirements of - cryptographic protocols through one-time state reuse. In the worst - case, children continually reuse the same state due to repeatedly - forking from a static parent. - - :meth:`fork` cleans up Mitogen-internal objects, in addition to - locks held by the :mod:`logging` package, reseeds - :func:`random.random`, and the OpenSSL PRNG via - :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is - already loaded. You must arrange for your program's state, including - any third party packages in use, to be cleaned up by specifying an - `on_fork` function. - - The associated stream implementation is - :class:`mitogen.fork.Stream`. - - :param function on_fork: - Function invoked as `on_fork()` from within the child process. This - permits supplying a program-specific cleanup function to break - locks and close file descriptors belonging to the parent from - within the child. - - :param function on_start: - Invoked as `on_start(econtext)` from within the child process after - it has been set up, but before the function dispatch loop starts. - This permits supplying a custom child main function that inherits - rich data structures that cannot normally be passed via a - serialization. - - :param mitogen.core.Context via: - Same as the `via` parameter for :meth:`local`. - - :param bool debug: - Same as the `debug` parameter for :meth:`local`. - - :param bool profiling: - Same as the `profiling` parameter for :meth:`local`. - - .. method:: local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) - - Construct a context on the local machine as a subprocess of the current - process. The associated stream implementation is - :class:`mitogen.master.Stream`. - - :param str remote_name: - The ``argv[0]`` suffix for the new process. If `remote_name` is - ``test``, the new process ``argv[0]`` will be ``mitogen:test``. - - If unspecified, defaults to ``@:``. - - This variable cannot contain slash characters, as the resulting - ``argv[0]`` must be presented in such a way as to allow Python to - determine its installation prefix. This is required to support - virtualenv. - - :param str|list python_path: - String or list path to the Python interpreter to use for bootstrap. - Defaults to :data:`sys.executable` for local connections, and - ``python`` for remote connections. - - It is possible to pass a list to invoke Python wrapped using - another tool, such as ``["/usr/bin/env", "python"]``. - - :param bool debug: - If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to - be enabled in the new context. Automatically :data:`True` when - :meth:`enable_debug` has been called, but may be used - selectively otherwise. - - :param bool unidirectional: - If :data:`True`, arrange for the child's router to be constructed - with :attr:`unidirectional routing - ` enabled. Automatically - :data:`True` when it was enabled for this router, but may still be - explicitly set to :data:`False`. - - :param float connect_timeout: - Fractional seconds to wait for the subprocess to indicate it is - healthy. Defaults to 30 seconds. - - :param bool profiling: - If :data:`True`, arrange for profiling (:data:`profiling`) to be - enabled in the new context. Automatically :data:`True` when - :data:`profiling` is :data:`True`, but may be used selectively - otherwise. - - :param mitogen.core.Context via: - If not :data:`None`, arrange for construction to occur via RPCs - made to the context `via`, and for :data:`ADD_ROUTE - ` messages to be generated as appropriate. - - .. code-block:: python - - # SSH to the remote machine. - remote_machine = router.ssh(hostname='mybox.com') - - # Use the SSH connection to create a sudo connection. - remote_root = router.sudo(username='root', via=remote_machine) - - .. method:: doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``doas`` invocation. - The ``doas`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to use, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str doas_path: - Filename or complete path to the ``doas`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``doas``. - :param bytes password_prompt: - A string that indicates ``doas`` is requesting a password. Defaults - to ``Password:``. - :param list incorrect_prompts: - List of bytestrings indicating the password is incorrect. Defaults - to `(b"doas: authentication failed")`. - :raises mitogen.doas.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or the target account did not exist. - - .. method:: docker (container=None, image=None, docker_path=None, \**kwargs) - - Construct a context on the local machine within an existing or - temporary new Docker container using the ``docker`` program. One of - `container` or `image` must be specified. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which Docker interprets as ``root``. - :param str image: - Image tag to use to construct a temporary container. Defaults to - :data:`None`. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - - .. method:: jail (container, jexec_path=None, \**kwargs) - - Construct a context on the local machine within a FreeBSD jail using - the ``jexec`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which ``jexec`` interprets as ``root``. - :param str jexec_path: - Filename or complete path to the ``jexec`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``/usr/sbin/jexec``. - - .. method:: kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) - - Construct a context in a container via the Kubernetes ``kubectl`` - program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str pod: - Kubernetes pod to connect to. - :param str kubectl_path: - Filename or complete path to the ``kubectl`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``kubectl``. - :param list kubectl_args: - Additional arguments to pass to the ``kubectl`` command. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within an LXC classic - container using the ``lxc-attach`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_attach_path: - Filename or complete path to the ``lxc-attach`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``lxc-attach``. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within a LXD container using - the ``lxc`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_path: - Filename or complete path to the ``lxc`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``lxc``. - - .. method:: setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) - - Construct a context in the style of :meth:`local`, but change the - active Linux process namespaces via calls to `setns(1)` before - executing Python. - - The namespaces to use, and the active root file system are taken from - the root PID of a running Docker, LXC, LXD, or systemd-nspawn - container. - - A program is required only to find the root PID, after which management - of the child Python interpreter is handled directly. - - :param str container: - Container to connect to. - :param str kind: - One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - ``root``. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - :param str lxc_path: - Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``lxc``. - :param str lxc_info_path: - Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` - will be searched if given as a filename. Defaults to ``lxc-info``. - :param str machinectl_path: - Filename or complete path to the ``machinectl`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``machinectl``. - - .. method:: su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``su`` invocation. The - ``su`` process is started in a newly allocated pseudo-terminal, and - supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to ``su``, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str su_path: - Filename or complete path to the ``su`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``su``. - :param bytes password_prompt: - The string that indicates ``su`` is requesting a password. Defaults - to ``Password:``. - :param str incorrect_prompts: - Strings that signal the password is incorrect. Defaults to `("su: - sorry", "su: authentication failure")`. - - :raises mitogen.su.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or (on BSD) the target account did not - exist. - - .. method:: sudo (username=None, sudo_path=None, password=None, \**kwargs) - - Construct a context on the local machine over a ``sudo`` invocation. - The ``sudo`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to sudo as the ``-u`` parameter, defaults to - ``root``. - :param str sudo_path: - Filename or complete path to the sudo binary. ``PATH`` will be - searched if given as a filename. Defaults to ``sudo``. - :param str password: - The password to use if/when sudo requests it. Depending on the sudo - configuration, this is either the current account password or the - target account password. :class:`mitogen.sudo.PasswordError` - will be raised if sudo requests a password but none is provided. - :param bool set_home: - If :data:`True`, request ``sudo`` set the ``HOME`` environment - variable to match the target UNIX account. - :param bool preserve_env: - If :data:`True`, request ``sudo`` to preserve the environment of - the parent process. - :param str selinux_type: - If not :data:`None`, the SELinux security context to use. - :param str selinux_role: - If not :data:`None`, the SELinux role to use. - :param list sudo_args: - Arguments in the style of :data:`sys.argv` that would normally - be passed to ``sudo``. The arguments are parsed in-process to set - equivalent parameters. Re-parsing ensures unsupported options cause - :class:`mitogen.core.StreamError` to be raised, and that - attributes of the stream match the actual behaviour of ``sudo``. - - .. method:: ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) - - Construct a remote context over an OpenSSH ``ssh`` invocation. - - The ``ssh`` process is started in a newly allocated pseudo-terminal to - support typing interactive passwords and responding to prompts, if a - password is specified, or `check_host_keys=accept`. In other scenarios, - ``BatchMode`` is enabled and no PTY is allocated. For many-target - configurations, both options should be avoided as most systems have a - conservative limit on the number of pseudo-terminals that may exist. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - The SSH username; default is unspecified, which causes SSH to pick - the username to use. - :param str ssh_path: - Absolute or relative path to ``ssh``. Defaults to ``ssh``. - :param list ssh_args: - Additional arguments to pass to the SSH command. - :param int port: - Port number to connect to; default is unspecified, which causes SSH - to pick the port number. - :param str check_host_keys: - Specifies the SSH host key checking mode. Defaults to ``enforce``. - - * ``ignore``: no host key checking is performed. Connections never - fail due to an unknown or changed host key. - * ``accept``: known hosts keys are checked to ensure they match, - new host keys are automatically accepted and verified in future - connections. - * ``enforce``: known host keys are checked to ensure they match, - unknown hosts cause a connection failure. - :param str password: - Password to type if/when ``ssh`` requests it. If not specified and - a password is requested, :class:`mitogen.ssh.PasswordError` is - raised. - :param str identity_file: - Path to an SSH private key file to use for authentication. Default - is unspecified, which causes SSH to pick the identity file. - - When this option is specified, only `identity_file` will be used by - the SSH client to perform authenticaion; agent authentication is - automatically disabled, as is reading the default private key from - ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. - :param bool identities_only: - If :data:`True` and a password or explicit identity file is - specified, instruct the SSH client to disable any authentication - identities inherited from the surrounding environment, such as - those loaded in any running ``ssh-agent``, or default key files - present in ``~/.ssh``. This ensures authentication attempts only - occur using the supplied password or SSH key. - :param bool compression: - If :data:`True`, enable ``ssh`` compression support. Compression - has a minimal effect on the size of modules transmitted, as they - are already compressed, however it has a large effect on every - remaining message in the otherwise uncompressed stream protocol, - such as function call arguments and return values. - :param int ssh_debug_level: - Optional integer `0..3` indicating the SSH client debug level. - :raises mitogen.ssh.PasswordError: - A password was requested but none was specified, or the specified - password was incorrect. - - :raises mitogen.ssh.HostKeyError: - When `check_host_keys` is set to either ``accept``, indicates a - previously recorded key no longer matches the remote machine. When - set to ``enforce``, as above, but additionally indicates no - previously recorded key exists for the remote machine. + :meth:`fork` cleans up Mitogen-internal objects, in addition to + locks held by the :mod:`logging` package, reseeds + :func:`random.random`, and the OpenSSL PRNG via + :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is + already loaded. You must arrange for your program's state, including + any third party packages in use, to be cleaned up by specifying an + `on_fork` function. + + The associated stream implementation is + :class:`mitogen.fork.Stream`. + + :param function on_fork: + Function invoked as `on_fork()` from within the child process. This + permits supplying a program-specific cleanup function to break + locks and close file descriptors belonging to the parent from + within the child. + + :param function on_start: + Invoked as `on_start(econtext)` from within the child process after + it has been set up, but before the function dispatch loop starts. + This permits supplying a custom child main function that inherits + rich data structures that cannot normally be passed via a + serialization. + + :param mitogen.core.Context via: + Same as the `via` parameter for :meth:`local`. + + :param bool debug: + Same as the `debug` parameter for :meth:`local`. + + :param bool profiling: + Same as the `profiling` parameter for :meth:`local`. + +.. method:: mitogen.parent.Router.local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) + + Construct a context on the local machine as a subprocess of the current + process. The associated stream implementation is + :class:`mitogen.master.Stream`. + + :param str remote_name: + The ``argv[0]`` suffix for the new process. If `remote_name` is + ``test``, the new process ``argv[0]`` will be ``mitogen:test``. + + If unspecified, defaults to ``@:``. + + This variable cannot contain slash characters, as the resulting + ``argv[0]`` must be presented in such a way as to allow Python to + determine its installation prefix. This is required to support + virtualenv. + + :param str|list python_path: + String or list path to the Python interpreter to use for bootstrap. + Defaults to :data:`sys.executable` for local connections, and + ``python`` for remote connections. + + It is possible to pass a list to invoke Python wrapped using + another tool, such as ``["/usr/bin/env", "python"]``. + + :param bool debug: + If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to + be enabled in the new context. Automatically :data:`True` when + :meth:`enable_debug` has been called, but may be used + selectively otherwise. + + :param bool unidirectional: + If :data:`True`, arrange for the child's router to be constructed + with :attr:`unidirectional routing + ` enabled. Automatically + :data:`True` when it was enabled for this router, but may still be + explicitly set to :data:`False`. + + :param float connect_timeout: + Fractional seconds to wait for the subprocess to indicate it is + healthy. Defaults to 30 seconds. + + :param bool profiling: + If :data:`True`, arrange for profiling (:data:`profiling`) to be + enabled in the new context. Automatically :data:`True` when + :data:`profiling` is :data:`True`, but may be used selectively + otherwise. + + :param mitogen.core.Context via: + If not :data:`None`, arrange for construction to occur via RPCs + made to the context `via`, and for :data:`ADD_ROUTE + ` messages to be generated as appropriate. + + .. code-block:: python + + # SSH to the remote machine. + remote_machine = router.ssh(hostname='mybox.com') + + # Use the SSH connection to create a sudo connection. + remote_root = router.sudo(username='root', via=remote_machine) + +.. method:: mitogen.parent.Router.doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``doas`` invocation. + The ``doas`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to use, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str doas_path: + Filename or complete path to the ``doas`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``doas``. + :param bytes password_prompt: + A string that indicates ``doas`` is requesting a password. Defaults + to ``Password:``. + :param list incorrect_prompts: + List of bytestrings indicating the password is incorrect. Defaults + to `(b"doas: authentication failed")`. + :raises mitogen.doas.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or the target account did not exist. + +.. method:: mitogen.parent.Router.docker (container=None, image=None, docker_path=None, \**kwargs) + + Construct a context on the local machine within an existing or + temporary new Docker container using the ``docker`` program. One of + `container` or `image` must be specified. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which Docker interprets as ``root``. + :param str image: + Image tag to use to construct a temporary container. Defaults to + :data:`None`. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + +.. method:: mitogen.parent.Router.jail (container, jexec_path=None, \**kwargs) + + Construct a context on the local machine within a FreeBSD jail using + the ``jexec`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which ``jexec`` interprets as ``root``. + :param str jexec_path: + Filename or complete path to the ``jexec`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``/usr/sbin/jexec``. + +.. method:: mitogen.parent.Router.kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) + + Construct a context in a container via the Kubernetes ``kubectl`` + program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str pod: + Kubernetes pod to connect to. + :param str kubectl_path: + Filename or complete path to the ``kubectl`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``kubectl``. + :param list kubectl_args: + Additional arguments to pass to the ``kubectl`` command. + +.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within an LXC classic + container using the ``lxc-attach`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_attach_path: + Filename or complete path to the ``lxc-attach`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``lxc-attach``. + +.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within a LXD container using + the ``lxc`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_path: + Filename or complete path to the ``lxc`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``lxc``. + +.. method:: mitogen.parent.Router.setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) + + Construct a context in the style of :meth:`local`, but change the + active Linux process namespaces via calls to `setns(1)` before + executing Python. + + The namespaces to use, and the active root file system are taken from + the root PID of a running Docker, LXC, LXD, or systemd-nspawn + container. + + A program is required only to find the root PID, after which management + of the child Python interpreter is handled directly. + + :param str container: + Container to connect to. + :param str kind: + One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + ``root``. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + :param str lxc_path: + Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``lxc``. + :param str lxc_info_path: + Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` + will be searched if given as a filename. Defaults to ``lxc-info``. + :param str machinectl_path: + Filename or complete path to the ``machinectl`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``machinectl``. + +.. method:: mitogen.parent.Router.su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``su`` invocation. The + ``su`` process is started in a newly allocated pseudo-terminal, and + supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to ``su``, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str su_path: + Filename or complete path to the ``su`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``su``. + :param bytes password_prompt: + The string that indicates ``su`` is requesting a password. Defaults + to ``Password:``. + :param str incorrect_prompts: + Strings that signal the password is incorrect. Defaults to `("su: + sorry", "su: authentication failure")`. + + :raises mitogen.su.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or (on BSD) the target account did not + exist. + +.. method:: mitogen.parent.Router.sudo (username=None, sudo_path=None, password=None, \**kwargs) + + Construct a context on the local machine over a ``sudo`` invocation. + The ``sudo`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to sudo as the ``-u`` parameter, defaults to + ``root``. + :param str sudo_path: + Filename or complete path to the sudo binary. ``PATH`` will be + searched if given as a filename. Defaults to ``sudo``. + :param str password: + The password to use if/when sudo requests it. Depending on the sudo + configuration, this is either the current account password or the + target account password. :class:`mitogen.sudo.PasswordError` + will be raised if sudo requests a password but none is provided. + :param bool set_home: + If :data:`True`, request ``sudo`` set the ``HOME`` environment + variable to match the target UNIX account. + :param bool preserve_env: + If :data:`True`, request ``sudo`` to preserve the environment of + the parent process. + :param str selinux_type: + If not :data:`None`, the SELinux security context to use. + :param str selinux_role: + If not :data:`None`, the SELinux role to use. + :param list sudo_args: + Arguments in the style of :data:`sys.argv` that would normally + be passed to ``sudo``. The arguments are parsed in-process to set + equivalent parameters. Re-parsing ensures unsupported options cause + :class:`mitogen.core.StreamError` to be raised, and that + attributes of the stream match the actual behaviour of ``sudo``. + +.. method:: mitogen.parent.Router.ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) + + Construct a remote context over an OpenSSH ``ssh`` invocation. + + The ``ssh`` process is started in a newly allocated pseudo-terminal to + support typing interactive passwords and responding to prompts, if a + password is specified, or `check_host_keys=accept`. In other scenarios, + ``BatchMode`` is enabled and no PTY is allocated. For many-target + configurations, both options should be avoided as most systems have a + conservative limit on the number of pseudo-terminals that may exist. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + The SSH username; default is unspecified, which causes SSH to pick + the username to use. + :param str ssh_path: + Absolute or relative path to ``ssh``. Defaults to ``ssh``. + :param list ssh_args: + Additional arguments to pass to the SSH command. + :param int port: + Port number to connect to; default is unspecified, which causes SSH + to pick the port number. + :param str check_host_keys: + Specifies the SSH host key checking mode. Defaults to ``enforce``. + + * ``ignore``: no host key checking is performed. Connections never + fail due to an unknown or changed host key. + * ``accept``: known hosts keys are checked to ensure they match, + new host keys are automatically accepted and verified in future + connections. + * ``enforce``: known host keys are checked to ensure they match, + unknown hosts cause a connection failure. + :param str password: + Password to type if/when ``ssh`` requests it. If not specified and + a password is requested, :class:`mitogen.ssh.PasswordError` is + raised. + :param str identity_file: + Path to an SSH private key file to use for authentication. Default + is unspecified, which causes SSH to pick the identity file. + + When this option is specified, only `identity_file` will be used by + the SSH client to perform authenticaion; agent authentication is + automatically disabled, as is reading the default private key from + ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. + :param bool identities_only: + If :data:`True` and a password or explicit identity file is + specified, instruct the SSH client to disable any authentication + identities inherited from the surrounding environment, such as + those loaded in any running ``ssh-agent``, or default key files + present in ``~/.ssh``. This ensures authentication attempts only + occur using the supplied password or SSH key. + :param bool compression: + If :data:`True`, enable ``ssh`` compression support. Compression + has a minimal effect on the size of modules transmitted, as they + are already compressed, however it has a large effect on every + remaining message in the otherwise uncompressed stream protocol, + such as function call arguments and return values. + :param int ssh_debug_level: + Optional integer `0..3` indicating the SSH client debug level. + :raises mitogen.ssh.PasswordError: + A password was requested but none was specified, or the specified + password was incorrect. + + :raises mitogen.ssh.HostKeyError: + When `check_host_keys` is set to either ``accept``, indicates a + previously recorded key no longer matches the remote machine. When + set to ``enforce``, as above, but additionally indicates no + previously recorded key exists for the remote machine. Context Class @@ -619,126 +581,22 @@ Channel Class ============= .. currentmodule:: mitogen.core +.. autoclass:: Channel + :members: -.. class:: Channel (router, context, dst_handle, handle=None) - - A channel inherits from :class:`mitogen.core.Sender` and - `mitogen.core.Receiver` to provide bidirectional functionality. - - Since all handles aren't known until after both ends are constructed, for - both ends to communicate through a channel, it is necessary for one end to - retrieve the handle allocated to the other and reconfigure its own channel - to match. Currently this is a manual task. Broker Class ============ .. currentmodule:: mitogen.core -.. class:: Broker - - Responsible for handling I/O multiplexing in a private thread. - - **Note:** This is the somewhat limited core version of the Broker class - used by child contexts. The master subclass is documented below. - - .. attribute:: shutdown_timeout = 3.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. - - .. method:: defer (func, \*args, \*kwargs) - - Arrange for `func(\*args, \**kwargs)` to be executed on the broker - thread, or immediately if the current thread is the broker thread. Safe - to call from any thread. - - .. method:: defer_sync (func) - - Arrange for `func()` to execute on the broker thread, blocking the - current thread until a result or exception is available. - - :returns: - Call result. - - .. method:: start_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - ready for reading. Safe to call from any thread. When the associated - file descriptor becomes ready for reading, - :meth:`BasicStream.on_receive` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - not ready for reading. Safe to call from any thread. - - .. method:: _start_transmit (stream) - - Mark the :attr:`transmit_side ` on `stream` as - ready for writing. Must only be called from the Broker thread. When the - associated file descriptor becomes ready for writing, - :meth:`BasicStream.on_transmit` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`transmit_side ` on `stream` as - not ready for writing. Safe to call from any thread. - - .. method:: shutdown - - Request broker gracefully disconnect streams and stop. - - .. method:: join - - Wait for the broker to stop, expected to be called after - :meth:`shutdown`. - - .. method:: keep_alive - - Return :data:`True` if any reader's :attr:`Side.keep_alive` - attribute is :data:`True`, or any - :class:`Context ` is still - registered that is not the master. Used to delay shutdown while some - important work is in progress (e.g. log draining). - - **Internal Methods** - - .. method:: _broker_main - - Handle events until :meth:`shutdown`. On shutdown, invoke - :meth:`Stream.on_shutdown` for every active stream, then allow up to - :attr:`shutdown_timeout` seconds for the streams to unregister - themselves before forcefully calling - :meth:`Stream.on_disconnect`. +.. autoclass:: Broker + :members: .. currentmodule:: mitogen.master -.. class:: Broker (install_watcher=True) - - .. note:: - - You may construct as many brokers as desired, and use the same broker - for multiple routers, however usually only one broker need exist. - Multiple brokers may be useful when dealing with sets of children with - differing lifetimes. For example, a subscription service where - non-payment results in termination for one customer. - - :param bool install_watcher: - If :data:`True`, an additional thread is started to monitor the - lifetime of the main thread, triggering :meth:`shutdown` - automatically in case the user forgets to call it, or their code - crashed. - - You should not rely on this functionality in your program, it is only - intended as a fail-safe and to simplify the API for new users. In - particular, alternative Python implementations may not be able to - support watching the main thread. - - .. attribute:: shutdown_timeout = 5.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. +.. autoclass:: Broker + :members: Utility Functions diff --git a/docs/changelog.rst b/docs/changelog.rst index a7109a8d..6c095dfb 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -126,7 +126,7 @@ Core Library v0.2.4 (2018-??-??) ------------------- +------------------- Mitogen for Ansible ~~~~~~~~~~~~~~~~~~~ diff --git a/docs/internals.rst b/docs/internals.rst index 9c533952..fc9d57ac 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -35,162 +35,33 @@ Side Class ========== .. currentmodule:: mitogen.core - -.. class:: Side (stream, fd, keep_alive=True) - - Represent a single side of a :py:class:`BasicStream`. This exists to allow - streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional - (e.g. UNIX socket) file descriptors to operate identically. - - :param mitogen.core.Stream stream: - The stream this side is associated with. - - :param int fd: - Underlying file descriptor. - - :param bool keep_alive: - Value for :py:attr:`keep_alive` - - During construction, the file descriptor has its :py:data:`os.O_NONBLOCK` - flag enabled using :py:func:`fcntl.fcntl`. - - .. attribute:: stream - - The :py:class:`Stream` for which this is a read or write side. - - .. attribute:: fd - - Integer file descriptor to perform IO on, or :data:`None` if - :py:meth:`close` has been called. - - .. attribute:: keep_alive - - If :data:`True`, causes presence of this side in :py:class:`Broker`'s - active reader set to defer shutdown until the side is disconnected. - - .. method:: fileno - - Return :py:attr:`fd` if it is not :data:`None`, otherwise raise - :py:class:`StreamError`. This method is implemented so that - :py:class:`Side` can be used directly by :py:func:`select.select`. - - .. method:: close - - Call :py:func:`os.close` on :py:attr:`fd` if it is not :data:`None`, - then set it to :data:`None`. - - .. method:: read (n=CHUNK_SIZE) - - Read up to `n` bytes from the file descriptor, wrapping the underlying - :py:func:`os.read` call with :py:func:`io_op` to trap common - disconnection conditions. - - :py:meth:`read` always behaves as if it is reading from a regular UNIX - file; socket, pipe, and TTY disconnection errors are masked and result - in a 0-sized read just like a regular file. - - :returns: - Bytes read, or the empty to string to indicate disconnection was - detected. - - .. method:: write (s) - - Write as much of the bytes from `s` as possible to the file descriptor, - wrapping the underlying :py:func:`os.write` call with :py:func:`io_op` - to trap common disconnection connditions. - - :returns: - Number of bytes written, or :data:`None` if disconnection was - detected. +.. autoclass:: Side + :members: Stream Classes ============== .. currentmodule:: mitogen.core - -.. class:: BasicStream - - .. attribute:: receive_side - - A :py:class:`Side` representing the stream's receive file descriptor. - - .. attribute:: transmit_side - - A :py:class:`Side` representing the stream's transmit file descriptor. - - .. method:: on_disconnect (broker) - - Called by :py:class:`Broker` to force disconnect the stream. The base - implementation simply closes :py:attr:`receive_side` and - :py:attr:`transmit_side` and unregisters the stream from the broker. - - .. method:: on_receive (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has - been marked readable using :py:meth:`Broker.start_receive` and the - broker has detected the associated file descriptor is ready for - reading. - - Subclasses must implement this method if - :py:meth:`Broker.start_receive` is ever called on them, and the method - must call :py:meth:`on_disconect` if reading produces an empty string. - - .. method:: on_transmit (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` - has been marked writeable using :py:meth:`Broker._start_transmit` and - the broker has detected the associated file descriptor is ready for - writing. - - Subclasses must implement this method if - :py:meth:`Broker._start_transmit` is ever called on them. - - .. method:: on_shutdown (broker) - - Called by :py:meth:`Broker.shutdown` to allow the stream time to - gracefully shutdown. The base implementation simply called - :py:meth:`on_disconnect`. +.. autoclass:: BasicStream + :members: .. autoclass:: Stream :members: - .. method:: pending_bytes () - - Returns the number of bytes queued for transmission on this stream. - This can be used to limit the amount of data buffered in RAM by an - otherwise unlimited consumer. - - For an accurate result, this method should be called from the Broker - thread, using a wrapper like: - - :: - - def get_pending_bytes(self, stream): - latch = mitogen.core.Latch() - self.broker.defer( - lambda: latch.put(stream.pending_bytes()) - ) - return latch.get() - - .. currentmodule:: mitogen.fork - .. autoclass:: Stream :members: .. currentmodule:: mitogen.parent - .. autoclass:: Stream :members: .. currentmodule:: mitogen.ssh - .. autoclass:: Stream :members: .. currentmodule:: mitogen.sudo - .. autoclass:: Stream :members: @@ -212,6 +83,7 @@ Poller Class .. currentmodule:: mitogen.core .. autoclass:: Poller + :members: .. currentmodule:: mitogen.parent .. autoclass:: KqueuePoller @@ -256,64 +128,16 @@ ExternalContext Class ===================== .. currentmodule:: mitogen.core +.. autoclass:: ExternalContext + :members: -.. class:: ExternalContext - - External context implementation. - - .. attribute:: broker - - The :py:class:`mitogen.core.Broker` instance. - - .. attribute:: context - - The :py:class:`mitogen.core.Context` instance. - - .. attribute:: channel - - The :py:class:`mitogen.core.Channel` over which - :py:data:`CALL_FUNCTION` requests are received. - - .. attribute:: stdout_log - - The :py:class:`mitogen.core.IoLogger` connected to ``stdout``. - - .. attribute:: importer - - The :py:class:`mitogen.core.Importer` instance. - - .. attribute:: stdout_log - - The :py:class:`IoLogger` connected to ``stdout``. - - .. attribute:: stderr_log - - The :py:class:`IoLogger` connected to ``stderr``. - - .. method:: _dispatch_calls - - Implementation for the main thread in every child context. mitogen.master ============== -.. currentmodule:: mitogen.master - -.. class:: ProcessMonitor - - Install a :py:data:`signal.SIGCHLD` handler that generates callbacks when a - specific child process has exitted. - - .. method:: add (pid, callback) - - Add a callback function to be notified of the exit status of a process. - - :param int pid: - Process ID to be notified of. - - :param callback: - Function invoked as `callback(status)`, where `status` is the raw - exit status of the child process. +.. currentmodule:: mitogen.parent +.. autoclass:: ProcessMonitor + :members: Blocking I/O Functions diff --git a/mitogen/core.py b/mitogen/core.py index 6cb311d3..2b1771a5 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -100,7 +100,7 @@ CALL_SERVICE = 110 #: * a remote receiver is disconnected or explicitly closed. #: * a related message could not be delivered due to no route existing for it. #: * a router is being torn down, as a sentinel value to notify -#: :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. +#: :meth:`mitogen.core.Router.add_handler` callbacks to clean up. IS_DEAD = 999 try: @@ -187,7 +187,7 @@ class Error(Exception): class LatchError(Error): - """Raised when an attempt is made to use a :py:class:`mitogen.core.Latch` + """Raised when an attempt is made to use a :class:`mitogen.core.Latch` that has been marked closed.""" pass @@ -239,7 +239,7 @@ class Kwargs(dict): class CallError(Error): """Serializable :class:`Error` subclass raised when - :py:meth:`Context.call() ` fails. A copy of + :meth:`Context.call() ` fails. A copy of the traceback from the external context is appended to the exception message.""" def __init__(self, fmt=None, *args): @@ -872,6 +872,15 @@ class Receiver(object): class Channel(Sender, Receiver): + """ + A channel inherits from :class:`mitogen.core.Sender` and + `mitogen.core.Receiver` to provide bidirectional functionality. + + Since all handles aren't known until after both ends are constructed, for + both ends to communicate through a channel, it is necessary for one end to + retrieve the handle allocated to the other and reconfigure its own channel + to match. Currently this is a manual task. + """ def __init__(self, router, context, dst_handle, handle=None): Sender.__init__(self, context, dst_handle) Receiver.__init__(self, router, handle) @@ -1160,12 +1169,35 @@ class LogHandler(logging.Handler): class Side(object): + """ + Represent a single side of a :class:`BasicStream`. This exists to allow + streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional + (e.g. UNIX socket) file descriptors to operate identically. + + :param mitogen.core.Stream stream: + The stream this side is associated with. + + :param int fd: + Underlying file descriptor. + + :param bool keep_alive: + Value for :attr:`keep_alive` + + During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag + enabled using :func:`fcntl.fcntl`. + """ _fork_refs = weakref.WeakValueDictionary() def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): + #: The :class:`Stream` for which this is a read or write side. self.stream = stream + #: Integer file descriptor to perform IO on, or :data:`None` if + #: :meth:`close` has been called. self.fd = fd self.closed = False + #: If :data:`True`, causes presence of this side in + #: :class:`Broker`'s active reader set to defer shutdown until the + #: side is disconnected. self.keep_alive = keep_alive self._fork_refs[id(self)] = self if cloexec: @@ -1182,12 +1214,29 @@ class Side(object): side.close() def close(self): + """ + Call :func:`os.close` on :attr:`fd` if it is not :data:`None`, + then set it to :data:`None`. + """ if not self.closed: _vv and IOLOG.debug('%r.close()', self) self.closed = True os.close(self.fd) def read(self, n=CHUNK_SIZE): + """ + Read up to `n` bytes from the file descriptor, wrapping the underlying + :func:`os.read` call with :func:`io_op` to trap common disconnection + conditions. + + :meth:`read` always behaves as if it is reading from a regular UNIX + file; socket, pipe, and TTY disconnection errors are masked and result + in a 0-sized read like a regular file. + + :returns: + Bytes read, or the empty to string to indicate disconnection was + detected. + """ if self.closed: # Refuse to touch the handle after closed, it may have been reused # by another thread. TODO: synchronize read()/write()/close(). @@ -1198,6 +1247,15 @@ class Side(object): return s def write(self, s): + """ + Write as much of the bytes from `s` as possible to the file descriptor, + wrapping the underlying :func:`os.write` call with :func:`io_op` to + trap common disconnection connditions. + + :returns: + Number of bytes written, or :data:`None` if disconnection was + detected. + """ if self.closed or self.fd is None: # Refuse to touch the handle after closed, it may have been reused # by another thread. @@ -1210,10 +1268,50 @@ class Side(object): class BasicStream(object): + #: A :class:`Side` representing the stream's receive file descriptor. receive_side = None + + #: A :class:`Side` representing the stream's transmit file descriptor. transmit_side = None + def on_receive(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`receive_side` has + been marked readable using :meth:`Broker.start_receive` and the broker + has detected the associated file descriptor is ready for reading. + + Subclasses must implement this if :meth:`Broker.start_receive` is ever + called on them, and the method must call :meth:`on_disconect` if + reading produces an empty string. + """ + + def on_transmit(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`transmit_side` + has been marked writeable using :meth:`Broker._start_transmit` and + the broker has detected the associated file descriptor is ready for + writing. + + Subclasses must implement this if :meth:`Broker._start_transmit` is + ever called on them. + """ + + def on_shutdown(self, broker): + """ + Called by :meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. The base implementation simply called + :meth:`on_disconnect`. + """ + _v and LOG.debug('%r.on_shutdown()', self) + fire(self, 'shutdown') + self.on_disconnect(broker) + def on_disconnect(self, broker): + """ + Called by :class:`Broker` to force disconnect the stream. The base + implementation simply closes :attr:`receive_side` and + :attr:`transmit_side` and unregisters the stream from the broker. + """ LOG.debug('%r.on_disconnect()', self) if self.receive_side: broker.stop_receive(self) @@ -1223,19 +1321,14 @@ class BasicStream(object): self.transmit_side.close() fire(self, 'disconnect') - def on_shutdown(self, broker): - _v and LOG.debug('%r.on_shutdown()', self) - fire(self, 'shutdown') - self.on_disconnect(broker) - class Stream(BasicStream): """ - :py:class:`BasicStream` subclass implementing mitogen's :ref:`stream + :class:`BasicStream` subclass implementing mitogen's :ref:`stream protocol `. """ - #: If not :data:`None`, :py:class:`Router` stamps this into - #: :py:attr:`Message.auth_id` of every message received on this stream. + #: If not :data:`None`, :class:`Router` stamps this into + #: :attr:`Message.auth_id` of every message received on this stream. auth_id = None #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and @@ -1272,7 +1365,7 @@ class Stream(BasicStream): def on_receive(self, broker): """Handle the next complete message on the stream. Raise - :py:class:`StreamError` on failure.""" + :class:`StreamError` on failure.""" _vv and IOLOG.debug('%r.on_receive()', self) buf = self.receive_side.read() @@ -1329,6 +1422,14 @@ class Stream(BasicStream): return True def pending_bytes(self): + """ + Return the number of bytes queued for transmission on this stream. This + can be used to limit the amount of data buffered in RAM by an otherwise + unlimited consumer. + + For an accurate result, this method should be called from the Broker + thread, for example by using :meth:`Broker.defer_sync`. + """ return self._output_buf_len def on_transmit(self, broker): @@ -1572,15 +1673,15 @@ class Poller(object): class Latch(object): """ - A latch is a :py:class:`Queue.Queue`-like object that supports mutation and - waiting from multiple threads, however unlike :py:class:`Queue.Queue`, + A latch is a :class:`Queue.Queue`-like object that supports mutation and + waiting from multiple threads, however unlike :class:`Queue.Queue`, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x. Latches implement queues using the UNIX self-pipe trick, and a per-thread - :py:func:`socket.socketpair` that is lazily created the first time any + :func:`socket.socketpair` that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait. @@ -1626,7 +1727,7 @@ class Latch(object): def close(self): """ Mark the latch as closed, and cause every sleeping thread to be woken, - with :py:class:`mitogen.core.LatchError` raised in each thread. + with :class:`mitogen.core.LatchError` raised in each thread. """ self._lock.acquire() try: @@ -1640,17 +1741,17 @@ class Latch(object): def empty(self): """ - Return :py:data:`True` if calling :py:meth:`get` would block. + Return :data:`True` if calling :meth:`get` would block. - As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even - though a subsequent call to :py:meth:`get` will succeed, since a - message may be posted at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`True` may be returned even + though a subsequent call to :meth:`get` will succeed, since a + message may be posted at any moment between :meth:`empty` and + :meth:`get`. - As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even - though a subsequent call to :py:meth:`get` will block, since another - waiting thread may be woken at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`False` may be returned even + though a subsequent call to :meth:`get` will block, since another + waiting thread may be woken at any moment between :meth:`empty` and + :meth:`get`. """ return len(self._queue) == 0 @@ -1683,14 +1784,14 @@ class Latch(object): Return the next enqueued object, or sleep waiting for one. :param float timeout: - If not :py:data:`None`, specifies a timeout in seconds. + If not :data:`None`, specifies a timeout in seconds. :param bool block: - If :py:data:`False`, immediately raise - :py:class:`mitogen.core.TimeoutError` if the latch is empty. + If :data:`False`, immediately raise + :class:`mitogen.core.TimeoutError` if the latch is empty. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. :raises mitogen.core.TimeoutError: Timeout was reached. @@ -1771,7 +1872,7 @@ class Latch(object): exists. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. """ _vv and IOLOG.debug('%r.put(%r)', self, obj) self._lock.acquire() @@ -1807,7 +1908,7 @@ class Latch(object): class Waker(BasicStream): """ - :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. + :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used to wake the multiplexer when another thread needs to modify its state (via a cross-thread function call). @@ -1893,8 +1994,8 @@ class Waker(BasicStream): class IoLogger(BasicStream): """ - :py:class:`BasicStream` subclass that sets up redirection of a standard - UNIX file descriptor back into the Python :py:mod:`logging` package. + :class:`BasicStream` subclass that sets up redirection of a standard + UNIX file descriptor back into the Python :mod:`logging` package. """ _buf = '' @@ -2126,8 +2227,8 @@ class Router(object): return handle def on_shutdown(self, broker): - """Called during :py:meth:`Broker.shutdown`, informs callbacks - registered with :py:meth:`add_handle_cb` the connection is dead.""" + """Called during :meth:`Broker.shutdown`, informs callbacks registered + with :meth:`add_handle_cb` the connection is dead.""" _v and LOG.debug('%r.on_shutdown(%r)', self, broker) fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): @@ -2249,14 +2350,26 @@ class Router(object): class Broker(object): + """ + Responsible for handling I/O multiplexing in a private thread. + + **Note:** This is the somewhat limited core version of the Broker class + used by child contexts. The master subclass is documented below. + """ poller_class = Poller _waker = None _thread = None + + #: Seconds grace to allow :class:`streams ` to shutdown gracefully + #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 def __init__(self, poller_class=None): self._alive = True self._waker = Waker(self) + #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker + #: thread, or immediately if the current thread is the broker thread. + #: Safe to call from any thread. self.defer = self._waker.defer self.poller = self.poller_class() self.poller.start_receive( @@ -2272,6 +2385,12 @@ class Broker(object): self._waker.broker_ident = self._thread.ident def start_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as + ready for reading. Safe to call from any thread. When the associated + file descriptor becomes ready for reading, + :meth:`BasicStream.on_receive` will be called. + """ _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) side = stream.receive_side assert side and side.fd is not None @@ -2279,26 +2398,47 @@ class Broker(object): side.fd, (side, stream.on_receive)) def stop_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as not + ready for reading. Safe to call from any thread. + """ _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) self.defer(self.poller.stop_receive, stream.receive_side.fd) def _start_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as + ready for writing. Must only be called from the Broker thread. When the + associated file descriptor becomes ready for writing, + :meth:`BasicStream.on_transmit` will be called. + """ _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) side = stream.transmit_side assert side and side.fd is not None self.poller.start_transmit(side.fd, (side, stream.on_transmit)) def _stop_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as not + ready for writing. + """ _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) self.poller.stop_transmit(stream.transmit_side.fd) def keep_alive(self): + """ + Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute + is :data:`True`, or any :class:`Context` is still registered that is + not the master. Used to delay shutdown while some important work is in + progress (e.g. log draining). + """ it = (side.keep_alive for (_, (side, _)) in self.poller.readers) return sum(it, 0) def defer_sync(self, func): """ - Block the calling thread while `func` runs on a broker thread. + Arrange for `func()` to execute on the broker thread, blocking the + current thread until a result or exception is available. :returns: Return value of `func()`. @@ -2349,6 +2489,12 @@ class Broker(object): side.stream.on_disconnect(self) def _broker_main(self): + """ + Handle events until :meth:`shutdown`. On shutdown, invoke + :meth:`Stream.on_shutdown` for every active stream, then allow up to + :attr:`shutdown_timeout` seconds for the streams to unregister + themselves before forcefully calling :meth:`Stream.on_disconnect`. + """ try: while self._alive: self._loop_once() @@ -2361,12 +2507,20 @@ class Broker(object): fire(self, 'exit') def shutdown(self): + """ + Request broker gracefully disconnect streams and stop. Safe to call + from any thread. + """ _v and LOG.debug('%r.shutdown()', self) def _shutdown(): self._alive = False self.defer(_shutdown) def join(self): + """ + Wait for the broker to stop, expected to be called after + :meth:`shutdown`. + """ self._thread.join() def __repr__(self): @@ -2438,6 +2592,34 @@ class Dispatcher(object): class ExternalContext(object): + """ + External context implementation. + + .. attribute:: broker + The :class:`mitogen.core.Broker` instance. + + .. attribute:: context + The :class:`mitogen.core.Context` instance. + + .. attribute:: channel + The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION` + requests are received. + + .. attribute:: stdout_log + The :class:`mitogen.core.IoLogger` connected to ``stdout``. + + .. attribute:: importer + The :class:`mitogen.core.Importer` instance. + + .. attribute:: stdout_log + The :class:`IoLogger` connected to ``stdout``. + + .. attribute:: stderr_log + The :class:`IoLogger` connected to ``stderr``. + + .. method:: _dispatch_calls + Implementation for the main thread in every child context. + """ detached = False def __init__(self, config): diff --git a/mitogen/master.py b/mitogen/master.py index 73302910..4b1b164d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -754,6 +754,26 @@ class ModuleResponder(object): class Broker(mitogen.core.Broker): + """ + .. note:: + + You may construct as many brokers as desired, and use the same broker + for multiple routers, however usually only one broker need exist. + Multiple brokers may be useful when dealing with sets of children with + differing lifetimes. For example, a subscription service where + non-payment results in termination for one customer. + + :param bool install_watcher: + If :data:`True`, an additional thread is started to monitor the + lifetime of the main thread, triggering :meth:`shutdown` + automatically in case the user forgets to call it, or their code + crashed. + + You should not rely on this functionality in your program, it is only + intended as a fail-safe and to simplify the API for new users. In + particular, alternative Python implementations may not be able to + support watching the main thread. + """ shutdown_timeout = 5.0 _watcher = None poller_class = mitogen.parent.PREFERRED_POLLER @@ -773,7 +793,32 @@ class Broker(mitogen.core.Broker): class Router(mitogen.parent.Router): + """ + Extend :class:`mitogen.core.Router` with functionality useful to masters, + and child contexts who later become masters. Currently when this class is + required, the target context's router is upgraded at runtime. + + .. note:: + + You may construct as many routers as desired, and use the same broker + for multiple routers, however usually only one broker and router need + exist. Multiple routers may be useful when dealing with separate trust + domains, for example, manipulating infrastructure belonging to separate + customers or projects. + + :param mitogen.master.Broker broker: + Broker to use. If not specified, a private :class:`Broker` is created. + """ broker_class = Broker + + #: When :data:`True`, cause the broker thread and any subsequent broker and + #: main threads existing in any child to write + #: ``/tmp/mitogen.stats...log`` containing a + #: :mod:`cProfile` dump on graceful exit. Must be set prior to construction + #: of any :class:`Broker`, e.g. via: + #: + #: .. code:: + #: mitogen.master.Router.profiling = True profiling = False def __init__(self, broker=None, max_message_size=None): @@ -796,6 +841,10 @@ class Router(mitogen.parent.Router): ) def enable_debug(self): + """ + Cause this context and any descendant child contexts to write debug + logs to ``/tmp/mitogen..log``. + """ mitogen.core.enable_debug_logging() self.debug = True @@ -830,6 +879,12 @@ class IdAllocator(object): BLOCK_SIZE = 1000 def allocate(self): + """ + Arrange for a unique context ID to be allocated and associated with a + route leading to the active context. In masters, the ID is generated + directly, in children it is forwarded to the master via a + :data:`mitogen.core.ALLOCATE_ID` message. + """ self.lock.acquire() try: id_ = self.next_id diff --git a/mitogen/parent.py b/mitogen/parent.py index 0fffdd67..780cecd7 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1822,6 +1822,10 @@ class Router(mitogen.core.Router): class ProcessMonitor(object): + """ + Install a :data:`signal.SIGCHLD` handler that generates callbacks when a + specific child process has exitted. This class is obsolete, do not use. + """ def __init__(self): # pid -> callback() self.callback_by_pid = {} @@ -1835,6 +1839,16 @@ class ProcessMonitor(object): del self.callback_by_pid[pid] def add(self, pid, callback): + """ + Add a callback function to be notified of the exit status of a process. + + :param int pid: + Process ID to be notified of. + + :param callback: + Function invoked as `callback(status)`, where `status` is the raw + exit status of the child process. + """ self.callback_by_pid[pid] = callback _instance = None From 5eff8ea4fb21d0ea34dfefc67f312ccf7a60bf12 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 13:01:47 +0000 Subject: [PATCH 03/35] tests: make result_shell_echo_hi compare less of the timedelta; closes #361 Assuming less than one second is too much to ask from Travis. --- tests/ansible/integration/async/result_shell_echo_hi.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ansible/integration/async/result_shell_echo_hi.yml b/tests/ansible/integration/async/result_shell_echo_hi.yml index 8858037a..dbf40bde 100644 --- a/tests/ansible/integration/async/result_shell_echo_hi.yml +++ b/tests/ansible/integration/async/result_shell_echo_hi.yml @@ -24,7 +24,7 @@ that: - async_out.changed == True - async_out.cmd == "echo hi" - - 'async_out.delta.startswith("0:00:00")' + - 'async_out.delta.startswith("0:00:")' - async_out.end.startswith("20") - async_out.invocation.module_args._raw_params == "echo hi" - async_out.invocation.module_args._uses_shell == True From 8e4c164d931dbe890ba7c886714f5dae835ce21c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 13:43:41 +0000 Subject: [PATCH 04/35] issue #388: fix Sphinx markup --- docs/api.rst | 25 +++++++++++++------------ mitogen/master.py | 3 +-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 57b9a655..bfba1f77 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -107,7 +107,8 @@ Router Class Connection Methods ================== -.. method:: mitogen.parent.Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) +.. currentmodule:: mitogen.parent +.. method:: Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) Construct a context on the local machine by forking the current process. The forked child receives a new identity, sets up a new broker @@ -203,7 +204,7 @@ Connection Methods :param bool profiling: Same as the `profiling` parameter for :meth:`local`. -.. method:: mitogen.parent.Router.local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) +.. method:: Router.local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) Construct a context on the local machine as a subprocess of the current process. The associated stream implementation is @@ -264,7 +265,7 @@ Connection Methods # Use the SSH connection to create a sudo connection. remote_root = router.sudo(username='root', via=remote_machine) -.. method:: mitogen.parent.Router.doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) +.. method:: Router.doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) Construct a context on the local machine over a ``doas`` invocation. The ``doas`` process is started in a newly allocated pseudo-terminal, @@ -289,7 +290,7 @@ Connection Methods A password was requested but none was provided, the supplied password was incorrect, or the target account did not exist. -.. method:: mitogen.parent.Router.docker (container=None, image=None, docker_path=None, \**kwargs) +.. method:: Router.docker (container=None, image=None, docker_path=None, \**kwargs) Construct a context on the local machine within an existing or temporary new Docker container using the ``docker`` program. One of @@ -309,7 +310,7 @@ Connection Methods Filename or complete path to the Docker binary. ``PATH`` will be searched if given as a filename. Defaults to ``docker``. -.. method:: mitogen.parent.Router.jail (container, jexec_path=None, \**kwargs) +.. method:: Router.jail (container, jexec_path=None, \**kwargs) Construct a context on the local machine within a FreeBSD jail using the ``jexec`` program. @@ -325,7 +326,7 @@ Connection Methods Filename or complete path to the ``jexec`` binary. ``PATH`` will be searched if given as a filename. Defaults to ``/usr/sbin/jexec``. -.. method:: mitogen.parent.Router.kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) +.. method:: Router.kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) Construct a context in a container via the Kubernetes ``kubectl`` program. @@ -340,7 +341,7 @@ Connection Methods :param list kubectl_args: Additional arguments to pass to the ``kubectl`` command. -.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) +.. method:: Router.lxc (container, lxc_attach_path=None, \**kwargs) Construct a context on the local machine within an LXC classic container using the ``lxc-attach`` program. @@ -354,7 +355,7 @@ Connection Methods will be searched if given as a filename. Defaults to ``lxc-attach``. -.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) +.. method:: Router.lxc (container, lxc_attach_path=None, \**kwargs) Construct a context on the local machine within a LXD container using the ``lxc`` program. @@ -367,7 +368,7 @@ Connection Methods Filename or complete path to the ``lxc`` binary. ``PATH`` will be searched if given as a filename. Defaults to ``lxc``. -.. method:: mitogen.parent.Router.setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) +.. method:: Router.setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) Construct a context in the style of :meth:`local`, but change the active Linux process namespaces via calls to `setns(1)` before @@ -401,7 +402,7 @@ Connection Methods will be searched if given as a filename. Defaults to ``machinectl``. -.. method:: mitogen.parent.Router.su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) +.. method:: Router.su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) Construct a context on the local machine over a ``su`` invocation. The ``su`` process is started in a newly allocated pseudo-terminal, and @@ -428,7 +429,7 @@ Connection Methods password was incorrect, or (on BSD) the target account did not exist. -.. method:: mitogen.parent.Router.sudo (username=None, sudo_path=None, password=None, \**kwargs) +.. method:: Router.sudo (username=None, sudo_path=None, password=None, \**kwargs) Construct a context on the local machine over a ``sudo`` invocation. The ``sudo`` process is started in a newly allocated pseudo-terminal, @@ -464,7 +465,7 @@ Connection Methods :class:`mitogen.core.StreamError` to be raised, and that attributes of the stream match the actual behaviour of ``sudo``. -.. method:: mitogen.parent.Router.ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) +.. method:: Router.ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) Construct a remote context over an OpenSSH ``ssh`` invocation. diff --git a/mitogen/master.py b/mitogen/master.py index 4b1b164d..65985b4d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -815,9 +815,8 @@ class Router(mitogen.parent.Router): #: main threads existing in any child to write #: ``/tmp/mitogen.stats...log`` containing a #: :mod:`cProfile` dump on graceful exit. Must be set prior to construction - #: of any :class:`Broker`, e.g. via: + #: of any :class:`Broker`, e.g. via:: #: - #: .. code:: #: mitogen.master.Router.profiling = True profiling = False From 5bdb745f0796a27255e35a193fd9692a746a4061 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 13:52:20 +0000 Subject: [PATCH 05/35] docs: howitworks tweaks --- docs/howitworks.rst | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 5e2c10f5..65a6daee 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -16,17 +16,17 @@ The UNIX First Stage To allow delivery of the bootstrap compressed using :py:mod:`zlib`, it is necessary for something on the remote to be prepared to decompress the payload -and feed it to a Python interpreter. Since we would like to avoid writing an -error-prone shell fragment to implement this, and since we must avoid writing -to the remote machine's disk in case it is read-only, the Python process -started on the remote machine by Mitogen immediately forks in order to +and feed it to a Python interpreter [#f1]_. Since we would like to avoid +writing an error-prone shell fragment to implement this, and since we must +avoid writing to the remote machine's disk in case it is read-only, the Python +process started on the remote machine by Mitogen immediately forks in order to implement the decompression. Python Command Line ################### -The Python command line sent to the host is a :mod:`zlib`-compressed [#f1]_ and +The Python command line sent to the host is a :mod:`zlib`-compressed [#f2]_ and base64-encoded copy of the :py:meth:`mitogen.master.Stream._first_stage` function, which has been carefully optimized to reduce its size. Prior to compression and encoding, ``CONTEXT_NAME`` is replaced with the desired context @@ -65,10 +65,10 @@ allowing reading by the first stage of exactly the required bytes. Configuring argv[0] ################### -Forking provides us with an excellent opportunity for tidying up the eventual -Python interpreter, in particular, restarting it using a fresh command-line to -get rid of the large base64-encoded first stage parameter, and to replace -**argv[0]** with something descriptive. +Forking provides an excellent opportunity to tidy up the eventual Python +interpreter, in particular, restarting it using a fresh command-line to get rid +of the large base64-encoded first stage parameter, and to replace **argv[0]** +with something descriptive. After configuring its ``stdin`` to point to the read end of the pipe, the parent half of the fork re-executes Python, with **argv[0]** taken from the @@ -1018,7 +1018,13 @@ receive items in the order they are requested, as they become available. .. rubric:: Footnotes -.. [#f1] Compression may seem redundant, however it is basically free and reducing IO +.. [#f1] Although some connection methods such as SSH support compression, and + Mitogen enables SSH compression by default, there are circumstances where + disabling SSH compression is desirable, and many scenarios for future + connection methods where transport-layer compression is not supported at + all. + +.. [#f2] Compression may seem redundant, however it is basically free and reducing IO is always a good idea. The 33% / 200 byte saving may mean the presence or absence of an additional frame on the network, or in real world terms after accounting for SSH overhead, around a 2% reduced chance of a stall during From 6fdc45da1aa60d6f6a77a4236b96e08a055cb908 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 14:48:24 +0000 Subject: [PATCH 06/35] docs: Changelog concision --- docs/ansible.rst | 2 +- docs/changelog.rst | 57 +++++++++++++++++++++------------------------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/docs/ansible.rst b/docs/ansible.rst index c30fffc2..33c73d06 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -429,7 +429,7 @@ Temporary Files Temporary file handling in Ansible is tricky, and the precise behaviour varies across major versions. A variety of temporary files and directories are -created, depending on the operating mode: +created, depending on the operating mode. In the best case when pipelining is enabled and no temporary uploads are required, for each task Ansible will create one directory below a diff --git a/docs/changelog.rst b/docs/changelog.rst index 6c095dfb..5405f209 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -137,15 +137,14 @@ Enhancements * `#76 `_, `#351 `_, `#352 `_: disconnect propagation - has improved, allowing Ansible to cancel waits for responses from targets - that where abruptly disconnected. This increases the chance a task will fail - gracefully, rather than hanging due to the connection being severed, for - example because of network failure or EC2 instance maintenance. + has improved, allowing Ansible to cancel waits for responses from abruptly + disconnected targets. This ensures a task will gracefully fail rather than + hang, for example on network failure or EC2 instance maintenance. * `#369 `_: :meth:`Connection.reset` is implemented, allowing `meta: reset_connection `_ to shut - down the remote interpreter as expected, and improving support for the + down the remote interpreter as documented, and improving support for the `reboot `_ module. @@ -156,26 +155,22 @@ Fixes * `#323 `_, `#333 `_: work around a Windows - Subsystem for Linux bug that would cause tracebacks to be rendered during - shutdown. + Subsystem for Linux bug that caused tracebacks to appear during shutdown. * `#334 `_: the SSH method - tilde-expands private key paths using Ansible's logic. Previously Mitogen - passed the path unmodified to SSH, which would expand it using - :func:`os.getpwent`. - - This differs from :func:`os.path.expanduser`, which prefers the ``HOME`` + tilde-expands private key paths using Ansible's logic. Previously the path + was passed unmodified to SSH, which expanded it using :func:`os.getpwent`. + This differs from :func:`os.path.expanduser`, which uses the ``HOME`` environment variable if it is set, causing behaviour to diverge when Ansible - was invoked using sudo without appropriate flags to cause the ``HOME`` - environment variable to be reset to match the target account. + was invoked across user accounts via ``sudo``. * `#370 `_: the Ansible `reboot `_ module is supported. * `#373 `_: the LXC and LXD methods - now print a useful hint when Python fails to start, as no useful error is - normally logged to the console by these tools. + print a useful hint on failure, as no useful error is normally logged to the + console by these tools. * `#400 `_: work around a threading bug in the AWX display callback when running with high verbosity setting. @@ -195,21 +190,21 @@ Fixes Core Library ~~~~~~~~~~~~ -* `#76 `_: routing maintains the set - of destination context ID ever received on each stream, and when - disconnection occurs, propagates ``DEL_ROUTE`` messages downwards towards - every stream that ever communicated with a disappearing peer, rather than - simply toward parents. +* `#76 `_: routing records the + destination context IDs ever received on each stream, and when disconnection + occurs, propagates :data:`mitogen.core.DEL_ROUTE` messages towards every + stream that ever communicated with the disappearing peer, rather than simply + towards parents. - Conversations between nodes in any level of the tree receive ``DEL_ROUTE`` - messages when a participant disconnects, allowing receivers to be woken with - :class:`mitogen.core.ChannelError` to signal the connection has broken, even - when one participant is not a parent of the other. + Conversations between nodes anywhere in the tree receive + :data:`mitogen.core.DEL_ROUTE` when either participant disconnects, allowing + receivers to wake with :class:`mitogen.core.ChannelError`, even when one + participant is not a parent of the other. -* `#405 `_: if a message is rejected - due to being too large, and it has a ``reply_to`` set, a dead message is - returned to the sender. This ensures function calls exceeding the configured - maximum size crash rather than hang. +* `#405 `_: if an oversized message + is rejected, and it has a ``reply_to`` set, a dead message is returned to the + sender. This ensures function calls exceeding the configured maximum size + crash rather than hang. * `#411 `_: the SSH method typed "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` @@ -227,7 +222,7 @@ Thanks! ~~~~~~~ Mitogen would not be possible without the support of users. A huge thanks for -bug reports, features and fixes in this release contributed by +bug reports, testing, features and fixes in this release contributed by `Berend De Schouwer `_, `Brian Candler `_, `Duane Zamrok `_, @@ -441,7 +436,7 @@ Thanks! ~~~~~~~ Mitogen would not be possible without the support of users. A huge thanks for -bug reports, features and fixes in this release contributed by +bug reports, testing, features and fixes in this release contributed by `Alex Russu `_, `Alex Willmer `_, `atoom `_, From 16c364910a5c6db7dd5cd740d539e8f8a26c2574 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 15:09:40 +0000 Subject: [PATCH 07/35] core: avoid redundant write() calls in Waker.defer() Using _lock we can know for certain whether the Broker has received a wakeup byte yet. If it has, we can skip the wasted system call. Now on_receive() can exactly read the single byte that can possibly exist (modulo FD sharing bugs -- this could be improved on later) --- mitogen/core.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 2b1771a5..2ac9bdbe 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1945,17 +1945,14 @@ class Waker(BasicStream): def on_receive(self, broker): """ - Drain the pipe and fire callbacks. Reading multiple bytes is safe since - new bytes corresponding to future .defer() calls are written only after - .defer() takes _lock: either a byte we read corresponds to something - already on the queue by the time we take _lock, or a byte remains - buffered, causing another wake up, because it was written after we - released _lock. + Drain the pipe and fire callbacks. Since :attr:`_deferred` is + synchronized, :meth:`defer` and :meth:`on_receive` can conspire to + ensure only one byte needs to be pending regardless of queue length. """ _vv and IOLOG.debug('%r.on_receive()', self) - self.receive_side.read(128) self._lock.acquire() try: + self.receive_side.read(1) deferred = self._deferred self._deferred = [] finally: @@ -1969,6 +1966,18 @@ class Waker(BasicStream): func, args, kwargs) self._broker.shutdown() + def _wake(self): + """ + Wake the multiplexer by writing a byte. If Broker is midway through + teardown, the FD may already be closed, so ignore EBADF. + """ + try: + self.transmit_side.write(b(' ')) + except OSError: + e = sys.exc_info()[1] + if e.args[0] != errno.EBADF: + raise + def defer(self, func, *args, **kwargs): if threading.currentThread().ident == self.broker_ident: _vv and IOLOG.debug('%r.defer() [immediate]', self) @@ -1977,20 +1986,12 @@ class Waker(BasicStream): _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) self._lock.acquire() try: + if not self._deferred: + self._wake() self._deferred.append((func, args, kwargs)) finally: self._lock.release() - # Wake the multiplexer by writing a byte. If the broker is in the midst - # of tearing itself down, the waker fd may already have been closed, so - # ignore EBADF here. - try: - self.transmit_side.write(b(' ')) - except OSError: - e = sys.exc_info()[1] - if e.args[0] != errno.EBADF: - raise - class IoLogger(BasicStream): """ From 87e8c45f760dbd036af7b575a492970eddd3d19d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 16:03:59 +0000 Subject: [PATCH 08/35] core: fix minify_test regression introduced in 804bacdadb14d60cf26808a64d0c6a70230ca643 The minifier can't handle empty function bodies, so the pass statements are necessary. --- mitogen/core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index 2ac9bdbe..83efad42 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1284,6 +1284,7 @@ class BasicStream(object): called on them, and the method must call :meth:`on_disconect` if reading produces an empty string. """ + pass def on_transmit(self, broker): """ @@ -1295,6 +1296,7 @@ class BasicStream(object): Subclasses must implement this if :meth:`Broker._start_transmit` is ever called on them. """ + pass def on_shutdown(self, broker): """ From e4280dc14a783842249eef348f4dfef562f28383 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 17:03:05 +0000 Subject: [PATCH 09/35] core: Don't crash in Waker.__repr__ if partially initialized. --- mitogen/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 83efad42..9dec1eab 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1930,8 +1930,8 @@ class Waker(BasicStream): def __repr__(self): return 'Waker(%r rfd=%r, wfd=%r)' % ( self._broker, - self.receive_side.fd, - self.transmit_side.fd, + self.receive_side and self.receive_side.fd, + self.transmit_side and self.transmit_side.fd, ) @property From d1c2e7a834489874e71ed7999e8859d67d39dd30 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 17:03:27 +0000 Subject: [PATCH 10/35] issue #406: call Poller.close() during broker shutdown. --- mitogen/core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index 9dec1eab..df4e54eb 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2491,6 +2491,8 @@ class Broker(object): LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) + self.poller.close() + def _broker_main(self): """ Handle events until :meth:`shutdown`. On shutdown, invoke From 4230a935577f043ce07136e70726b6e1edb30965 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Nov 2018 17:05:22 +0000 Subject: [PATCH 11/35] issue #406: update Changelog. --- docs/changelog.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 5405f209..d9746375 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -206,6 +206,10 @@ Core Library sender. This ensures function calls exceeding the configured maximum size crash rather than hang. +* `#406 `_: + :class:`mitogen.core.Broker` did not call :meth:`mitogen.core.Poller.close` + during shutdown, leaking the underlying poller FD in masters and parents. + * `#411 `_: the SSH method typed "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` was configured. This would lead to connection timeouts due to the hung From e9a6e4c3d216b3c77878d141d4cecaf1ef570d8d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 12:39:29 +0000 Subject: [PATCH 12/35] issue #406: add test. --- tests/broker_test.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/broker_test.py b/tests/broker_test.py index 7d070e3d..c35e6161 100644 --- a/tests/broker_test.py +++ b/tests/broker_test.py @@ -1,6 +1,7 @@ import threading +import mock import unittest2 import testlib @@ -8,6 +9,19 @@ import testlib import mitogen.core +class ShutdownTest(testlib.TestCase): + klass = mitogen.core.Broker + + def test_poller_closed(self): + broker = self.klass() + actual_close = broker.poller.close + broker.poller.close = mock.Mock() + broker.shutdown() + broker.join() + self.assertEquals(1, len(broker.poller.close.mock_calls)) + actual_close() + + class DeferSyncTest(testlib.TestCase): klass = mitogen.core.Broker From 8a0b3437600aebf08f91e886cb272dc627bef3b8 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 13:28:37 +0000 Subject: [PATCH 13/35] issue #406: test for FD leak after every TestCase --- dev_requirements.txt | 1 + tests/call_error_test.py | 4 ++-- tests/docker_test.py | 2 +- tests/fakessh_test.py | 2 +- tests/fork_test.py | 5 +++-- tests/local_test.py | 4 ++-- tests/master_test.py | 2 +- tests/minify_test.py | 4 ++-- tests/parent_test.py | 8 ++++---- tests/responder_test.py | 8 ++++---- tests/router_test.py | 7 +++---- tests/serialization_test.py | 4 ++-- tests/ssh_test.py | 4 ++-- tests/testlib.py | 26 ++++++++++++++++++++++++++ tests/types_test.py | 6 ++++-- tests/unix_test.py | 6 +++--- tests/utils_test.py | 8 +++++--- 17 files changed, 66 insertions(+), 35 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index f48006e5..c536c154 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,4 +1,5 @@ -r docs/docs-requirements.txt +psutil==5.4.8 coverage==4.5.1 Django==1.6.11 # Last version supporting 2.6. mock==2.0.0 diff --git a/tests/call_error_test.py b/tests/call_error_test.py index 447a80a9..1480a743 100644 --- a/tests/call_error_test.py +++ b/tests/call_error_test.py @@ -10,7 +10,7 @@ import testlib import plain_old_module -class ConstructorTest(unittest2.TestCase): +class ConstructorTest(testlib.TestCase): klass = mitogen.core.CallError def test_string_noargs(self): @@ -44,7 +44,7 @@ class ConstructorTest(unittest2.TestCase): self.assertTrue('test_from_exc_tb' in e.args[0]) -class PickleTest(unittest2.TestCase): +class PickleTest(testlib.TestCase): klass = mitogen.core.CallError def test_string_noargs(self): diff --git a/tests/docker_test.py b/tests/docker_test.py index 2d45609a..49c742ee 100644 --- a/tests/docker_test.py +++ b/tests/docker_test.py @@ -7,7 +7,7 @@ import unittest2 import testlib -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): docker_path = testlib.data_path('stubs/stub-docker.py') context = self.router.docker( diff --git a/tests/fakessh_test.py b/tests/fakessh_test.py index c584acfe..63c70058 100644 --- a/tests/fakessh_test.py +++ b/tests/fakessh_test.py @@ -10,7 +10,7 @@ import mitogen.fakessh import testlib -class RsyncTest(testlib.DockerMixin, unittest2.TestCase): +class RsyncTest(testlib.DockerMixin, testlib.TestCase): @timeoutcontext.timeout(5) @unittest2.skip('broken') def test_rsync_from_master(self): diff --git a/tests/fork_test.py b/tests/fork_test.py index 8b396bbf..5e457c97 100644 --- a/tests/fork_test.py +++ b/tests/fork_test.py @@ -55,7 +55,7 @@ def exercise_importer(n): return simple_pkg.a.subtract_one_add_two(n) -class ForkTest(testlib.RouterMixin, unittest2.TestCase): +class ForkTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): context = self.router.fork() self.assertNotEqual(context.call(os.getpid), os.getpid()) @@ -84,7 +84,8 @@ class ForkTest(testlib.RouterMixin, unittest2.TestCase): context = self.router.fork(on_start=on_start) self.assertEquals(123, recv.get().unpickle()) -class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase): + +class DoubleChildTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): # When forking from the master process, Mitogen had nothing to do with # setting up stdio -- that was inherited wherever the Master is running diff --git a/tests/local_test.py b/tests/local_test.py index fbf5c1c8..5a620e52 100644 --- a/tests/local_test.py +++ b/tests/local_test.py @@ -20,7 +20,7 @@ def get_os_environ(): return dict(os.environ) -class LocalTest(testlib.RouterMixin, unittest2.TestCase): +class LocalTest(testlib.RouterMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_stream_name(self): @@ -29,7 +29,7 @@ class LocalTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals('local.%d' % (pid,), context.name) -class PythonPathTest(testlib.RouterMixin, unittest2.TestCase): +class PythonPathTest(testlib.RouterMixin, testlib.TestCase): stream_class = mitogen.ssh.Stream def test_inherited(self): diff --git a/tests/master_test.py b/tests/master_test.py index 19a9b414..31d11013 100644 --- a/tests/master_test.py +++ b/tests/master_test.py @@ -6,7 +6,7 @@ import testlib import mitogen.master -class ScanCodeImportsTest(unittest2.TestCase): +class ScanCodeImportsTest(testlib.TestCase): func = staticmethod(mitogen.master.scan_code_imports) if mitogen.core.PY3: diff --git a/tests/minify_test.py b/tests/minify_test.py index 98307059..e990fb90 100644 --- a/tests/minify_test.py +++ b/tests/minify_test.py @@ -16,7 +16,7 @@ def read_sample(fname): return sample -class MinimizeSourceTest(unittest2.TestCase): +class MinimizeSourceTest(testlib.TestCase): func = staticmethod(mitogen.minify.minimize_source) def test_class(self): @@ -55,7 +55,7 @@ class MinimizeSourceTest(unittest2.TestCase): self.assertEqual(expected, self.func(original)) -class MitogenCoreTest(unittest2.TestCase): +class MitogenCoreTest(testlib.TestCase): # Verify minimize_source() succeeds for all built-in modules. func = staticmethod(mitogen.minify.minimize_source) diff --git a/tests/parent_test.py b/tests/parent_test.py index 9d540ccc..e6a93deb 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -153,7 +153,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase): self.assertTrue(s in e.args[0]) -class ContextTest(testlib.RouterMixin, unittest2.TestCase): +class ContextTest(testlib.RouterMixin, testlib.TestCase): def test_context_shutdown(self): local = self.router.local() pid = local.call(os.getpid) @@ -181,7 +181,7 @@ class OpenPtyTest(testlib.TestCase): self.assertEquals(e.args[0], msg) -class TtyCreateChildTest(unittest2.TestCase): +class TtyCreateChildTest(testlib.TestCase): func = staticmethod(mitogen.parent.tty_create_child) def test_dev_tty_open_succeeds(self): @@ -211,7 +211,7 @@ class TtyCreateChildTest(unittest2.TestCase): tf.close() -class IterReadTest(unittest2.TestCase): +class IterReadTest(testlib.TestCase): func = staticmethod(mitogen.parent.iter_read) def make_proc(self): @@ -263,7 +263,7 @@ class IterReadTest(unittest2.TestCase): proc.terminate() -class WriteAllTest(unittest2.TestCase): +class WriteAllTest(testlib.TestCase): func = staticmethod(mitogen.parent.write_all) def make_proc(self): diff --git a/tests/responder_test.py b/tests/responder_test.py index 46400fce..888302c0 100644 --- a/tests/responder_test.py +++ b/tests/responder_test.py @@ -13,7 +13,7 @@ import plain_old_module import simple_pkg.a -class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase): +class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.master.ModuleResponder def call(self, *args, **kwargs): @@ -67,7 +67,7 @@ class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase): -class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): +class GoodModulesTest(testlib.RouterMixin, testlib.TestCase): def test_plain_old_module(self): # The simplest case: a top-level module with no interesting imports or # package machinery damage. @@ -89,7 +89,7 @@ class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): self.assertEquals(output, "['__main__', 50]\n") -class BrokenModulesTest(unittest2.TestCase): +class BrokenModulesTest(testlib.TestCase): def test_obviously_missing(self): # Ensure we don't crash in the case of a module legitimately being # unavailable. Should never happen in the real world. @@ -144,7 +144,7 @@ class BrokenModulesTest(unittest2.TestCase): self.assertIsInstance(msg.unpickle(), tuple) -class BlacklistTest(unittest2.TestCase): +class BlacklistTest(testlib.TestCase): @unittest2.skip('implement me') def test_whitelist_no_blacklist(self): assert 0 diff --git a/tests/router_test.py b/tests/router_test.py index 7b7e2896..b0add6d3 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -36,7 +36,7 @@ def send_n_sized_reply(sender, n): return 123 -class SourceVerifyTest(testlib.RouterMixin, unittest2.TestCase): +class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase): def setUp(self): super(SourceVerifyTest, self).setUp() # Create some children, ping them, and store what their messages look @@ -149,7 +149,7 @@ class PolicyTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(e.args[0], self.router.refused_msg) -class CrashTest(testlib.BrokerMixin, unittest2.TestCase): +class CrashTest(testlib.BrokerMixin, testlib.TestCase): # This is testing both Broker's ability to crash nicely, and Router's # ability to respond to the crash event. klass = mitogen.master.Router @@ -178,8 +178,7 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase): self.assertTrue(expect in log.stop()) - -class AddHandlerTest(unittest2.TestCase): +class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router def test_invoked_at_shutdown(self): diff --git a/tests/serialization_test.py b/tests/serialization_test.py index f108ff37..d8c54c59 100644 --- a/tests/serialization_test.py +++ b/tests/serialization_test.py @@ -20,7 +20,7 @@ def roundtrip(v): return mitogen.core.Message(data=msg.data).unpickle() -class BlobTest(unittest2.TestCase): +class BlobTest(testlib.TestCase): klass = mitogen.core.Blob # Python 3 pickle protocol 2 does weird stuff depending on whether an empty @@ -36,7 +36,7 @@ class BlobTest(unittest2.TestCase): self.assertEquals(b(''), roundtrip(v)) -class ContextTest(testlib.RouterMixin, unittest2.TestCase): +class ContextTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.core.Context # Ensure Context can be round-tripped by regular pickle in addition to diff --git a/tests/ssh_test.py b/tests/ssh_test.py index 36359a66..661ff5ed 100644 --- a/tests/ssh_test.py +++ b/tests/ssh_test.py @@ -29,7 +29,7 @@ class StubSshMixin(testlib.RouterMixin): del os.environ['STUBSSH_MODE'] -class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): +class ConstructorTest(testlib.RouterMixin, testlib.TestCase): def test_okay(self): context = self.router.ssh( hostname='hostname', @@ -165,7 +165,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase): fp.close() -class BannerTest(testlib.DockerMixin, unittest2.TestCase): +class BannerTest(testlib.DockerMixin, testlib.TestCase): # Verify the ability to disambiguate random spam appearing in the SSHd's # login banner from a legitimate password prompt. stream_class = mitogen.ssh.Stream diff --git a/tests/testlib.py b/tests/testlib.py index 8f11337d..605254d3 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -8,9 +8,11 @@ import subprocess import sys import time +import psutil import unittest2 import mitogen.core +import mitogen.fork import mitogen.master import mitogen.utils @@ -41,6 +43,13 @@ if faulthandler is not None: faulthandler.enable() +def get_fd_count(): + """ + Return the number of FDs open by this process. + """ + return psutil.Process().num_fds() + + def data_path(suffix): path = os.path.join(DATA_DIR, suffix) if path.endswith('.key'): @@ -211,6 +220,23 @@ class LogCapturer(object): class TestCase(unittest2.TestCase): + @classmethod + def setUpClass(cls): + # This is done in setUpClass() so we have a chance to run before any + # Broker() instantiations in setUp() etc. + mitogen.fork.on_fork() + cls._fd_count_before = get_fd_count() + super(TestCase, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCase, cls).tearDownClass() + mitogen.fork.on_fork() + assert get_fd_count() == cls._fd_count_before, \ + "%s leaked FDs. Count before: %s, after: %s" % ( + cls, cls._fd_count_before, get_fd_count(), + ) + def assertRaises(self, exc, func, *args, **kwargs): """Like regular assertRaises, except return the exception that was raised. Can't use context manager because tests must run on Python2.4""" diff --git a/tests/types_test.py b/tests/types_test.py index 4f80e076..f929c098 100644 --- a/tests/types_test.py +++ b/tests/types_test.py @@ -11,8 +11,10 @@ import unittest2 import mitogen.core from mitogen.core import b +import testlib -class BlobTest(unittest2.TestCase): + +class BlobTest(testlib.TestCase): klass = mitogen.core.Blob def make(self): @@ -43,7 +45,7 @@ class BlobTest(unittest2.TestCase): mitogen.core.BytesType(blob2)) -class SecretTest(unittest2.TestCase): +class SecretTest(testlib.TestCase): klass = mitogen.core.Secret def make(self): diff --git a/tests/unix_test.py b/tests/unix_test.py index 67265c81..f837c6f0 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -30,7 +30,7 @@ class MyService(mitogen.service.Service): } -class IsPathDeadTest(unittest2.TestCase): +class IsPathDeadTest(testlib.TestCase): func = staticmethod(mitogen.unix.is_path_dead) path = '/tmp/stale-socket' @@ -57,7 +57,7 @@ class IsPathDeadTest(unittest2.TestCase): os.unlink(self.path) -class ListenerTest(testlib.RouterMixin, unittest2.TestCase): +class ListenerTest(testlib.RouterMixin, testlib.TestCase): klass = mitogen.unix.Listener def test_constructor_basic(self): @@ -66,7 +66,7 @@ class ListenerTest(testlib.RouterMixin, unittest2.TestCase): os.unlink(listener.path) -class ClientTest(unittest2.TestCase): +class ClientTest(testlib.TestCase): klass = mitogen.unix.Listener def _try_connect(self, path): diff --git a/tests/utils_test.py b/tests/utils_test.py index b2e0aa9e..5b81289e 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -6,6 +6,8 @@ import mitogen.core import mitogen.master import mitogen.utils +import testlib + def func0(router): return router @@ -16,7 +18,7 @@ def func(router): return router -class RunWithRouterTest(unittest2.TestCase): +class RunWithRouterTest(testlib.TestCase): # test_shutdown_on_exception # test_shutdown_on_success @@ -26,7 +28,7 @@ class RunWithRouterTest(unittest2.TestCase): self.assertFalse(router.broker._thread.isAlive()) -class WithRouterTest(unittest2.TestCase): +class WithRouterTest(testlib.TestCase): def test_with_broker(self): router = func() self.assertIsInstance(router, mitogen.master.Router) @@ -40,7 +42,7 @@ class Unicode(mitogen.core.UnicodeType): pass class Bytes(mitogen.core.BytesType): pass -class CastTest(unittest2.TestCase): +class CastTest(testlib.TestCase): def test_dict(self): self.assertEqual(type(mitogen.utils.cast({})), dict) self.assertEqual(type(mitogen.utils.cast(Dict())), dict) From 9b3cb55a8bf620a90470caeb001530557e5ff26c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 15:01:45 +0000 Subject: [PATCH 14/35] issue #4096: import log_fd_calls() helper. --- tests/testlib.py | 49 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/tests/testlib.py b/tests/testlib.py index 605254d3..7c376135 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -6,7 +6,9 @@ import re import socket import subprocess import sys +import threading import time +import traceback import psutil import unittest2 @@ -175,6 +177,46 @@ def sync_with_broker(broker, timeout=10.0): sem.get(timeout=10.0) +def log_fd_calls(): + mypid = os.getpid() + l = threading.Lock() + real_pipe = os.pipe + def pipe(): + with l: + rv = real_pipe() + if mypid == os.getpid(): + print + print rv + traceback.print_stack(limit=3) + print + return rv + + real_dup2 = os.dup2 + def dup2(*args): + with l: + real_dup2(*args) + if mypid == os.getpid(): + print + print '--', args + traceback.print_stack(limit=3) + print + + real_dup = os.dup + def dup(*args): + with l: + rc = real_dup(*args) + if mypid == os.getpid(): + print + print '--', args, '->', rv + traceback.print_stack(limit=3) + print + return rv + + os.pipe = pipe + os.dup = dup + os.dup2 = dup2 + + class CaptureStreamHandler(logging.StreamHandler): def __init__(self, *args, **kwargs): logging.StreamHandler.__init__(self, *args, **kwargs) @@ -232,10 +274,11 @@ class TestCase(unittest2.TestCase): def tearDownClass(cls): super(TestCase, cls).tearDownClass() mitogen.fork.on_fork() - assert get_fd_count() == cls._fd_count_before, \ - "%s leaked FDs. Count before: %s, after: %s" % ( + if get_fd_count() != cls._fd_count_before: + import os; os.system('lsof -p %s' % (os.getpid(),)) + assert 0, "%s leaked FDs. Count before: %s, after: %s" % ( cls, cls._fd_count_before, get_fd_count(), - ) + ) def assertRaises(self, exc, func, *args, **kwargs): """Like regular assertRaises, except return the exception that was From 70c550f50c7c0918a70c9baf080573e9d0a1b232 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 15:02:13 +0000 Subject: [PATCH 15/35] issue #406: close stdout pipes in parent_test --- tests/parent_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/parent_test.py b/tests/parent_test.py index e6a93deb..c4921c1f 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -230,6 +230,7 @@ class IterReadTest(testlib.TestCase): break finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_before_call(self): proc = self.make_proc() @@ -244,6 +245,7 @@ class IterReadTest(testlib.TestCase): self.assertEqual(len(got), 0) finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_during_call(self): proc = self.make_proc() @@ -261,6 +263,7 @@ class IterReadTest(testlib.TestCase): self.assertLess(len(got), 5) finally: proc.terminate() + proc.stdout.close() class WriteAllTest(testlib.TestCase): @@ -280,6 +283,7 @@ class WriteAllTest(testlib.TestCase): self.func(proc.stdin.fileno(), self.ten_ms_chunk) finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_before_call(self): proc = self.make_proc() @@ -289,6 +293,7 @@ class WriteAllTest(testlib.TestCase): )) finally: proc.terminate() + proc.stdout.close() def test_deadline_exceeded_during_call(self): proc = self.make_proc() @@ -301,6 +306,7 @@ class WriteAllTest(testlib.TestCase): )) finally: proc.terminate() + proc.stdout.close() class DisconnectTest(testlib.RouterMixin, testlib.TestCase): From 6ff1e001da9d829db518c1cecd1b6604433254b4 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 15:48:31 +0000 Subject: [PATCH 16/35] issue #406: log socketpair calls too. --- tests/testlib.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/testlib.py b/tests/testlib.py index 7c376135..8d2bd8d9 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -190,6 +190,19 @@ def log_fd_calls(): traceback.print_stack(limit=3) print return rv + os.pipe = pipe + + real_socketpair = socket.socketpair + def socketpair(*args): + with l: + rv = real_socketpair(*args) + if mypid == os.getpid(): + print + print '--', args, '->', rv + traceback.print_stack(limit=3) + print + return rv + socket.socketpair = socketpair real_dup2 = os.dup2 def dup2(*args): @@ -200,6 +213,7 @@ def log_fd_calls(): print '--', args traceback.print_stack(limit=3) print + os.dup2 = dup2 real_dup = os.dup def dup(*args): @@ -211,10 +225,7 @@ def log_fd_calls(): traceback.print_stack(limit=3) print return rv - - os.pipe = pipe os.dup = dup - os.dup2 = dup2 class CaptureStreamHandler(logging.StreamHandler): From 14b389cb467d4c3eb5715b0c8be98922c5a027cc Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 15:49:52 +0000 Subject: [PATCH 17/35] issue #406: don't leak FDs on failed child start. --- docs/changelog.rst | 3 +++ mitogen/parent.py | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index d9746375..781c42b9 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -210,6 +210,9 @@ Core Library :class:`mitogen.core.Broker` did not call :meth:`mitogen.core.Poller.close` during shutdown, leaking the underlying poller FD in masters and parents. +* `#406 `_: connections could leak + FDs when a child process failed to start. + * `#411 `_: the SSH method typed "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` was configured. This would lead to connection timeouts due to the hung diff --git a/mitogen/parent.py b/mitogen/parent.py index 780cecd7..3e30f475 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -211,7 +211,7 @@ def create_socketpair(): return parentfp, childfp -def detach_popen(*args, **kwargs): +def detach_popen(close_on_error=None, **kwargs): """ Use :class:`subprocess.Popen` to construct a child process, then hack the Popen so that it forgets the child it created, allowing it to survive a @@ -223,6 +223,8 @@ def detach_popen(*args, **kwargs): delivered to this process, causing later 'legitimate' calls to fail with ECHILD. + :param list close_on_error: + Array of integer file descriptors to close on exception. :returns: Process ID of the new child. """ @@ -230,7 +232,13 @@ def detach_popen(*args, **kwargs): # handling, without tying the surrounding code into managing a Popen # object, which isn't possible for at least :mod:`mitogen.fork`. This # should be replaced by a swappable helper class in a future version. - proc = subprocess.Popen(*args, **kwargs) + try: + proc = subprocess.Popen(**kwargs) + except Exception: + for fd in close_on_error or (): + os.close(fd) + raise + proc._child_created = False return proc.pid @@ -277,6 +285,7 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None): stdout=childfp, close_fds=True, preexec_fn=preexec_fn, + close_on_error=[parentfp.fileno(), childfp.fileno()], **extra ) if stderr_pipe: @@ -344,6 +353,7 @@ def tty_create_child(args): stdout=slave_fd, stderr=slave_fd, preexec_fn=_acquire_controlling_tty, + close_on_error=[master_fd, slave_fd], close_fds=True, ) @@ -372,6 +382,7 @@ def hybrid_tty_create_child(args): mitogen.core.set_block(childfp) disable_echo(master_fd) disable_echo(slave_fd) + pid = detach_popen( args=args, stdin=childfp, @@ -379,6 +390,7 @@ def hybrid_tty_create_child(args): stderr=slave_fd, preexec_fn=_acquire_controlling_tty, close_fds=True, + close_on_error=[master_fd, slave_fd, parentfp.fileno(), childfp.fileno()], ) os.close(slave_fd) From 375182b71b7705d85290faf24d011242c427fc2a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 19:48:36 +0000 Subject: [PATCH 18/35] issue #406: don't leak side FDs on bootstrap failure. --- mitogen/parent.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mitogen/parent.py b/mitogen/parent.py index 3e30f475..019ee917 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1152,10 +1152,14 @@ class Stream(mitogen.core.Stream): try: self._connect_bootstrap(extra_fd) except EofError: + self.receive_side.close() + self.transmit_side.close() e = sys.exc_info()[1] self._adorn_eof_error(e) raise except Exception: + self.receive_side.close() + self.transmit_side.close() self._reap_child() raise From b0dd628f07f0b96a2b2b29d5b5e9505cc3874d80 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 19:49:13 +0000 Subject: [PATCH 19/35] issue #406: parent_test fixes, NameError in log_fd_calls(). --- tests/parent_test.py | 7 ++++--- tests/testlib.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/parent_test.py b/tests/parent_test.py index c4921c1f..e83d6f1a 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -207,6 +207,7 @@ class TtyCreateChildTest(testlib.TestCase): self.assertEquals(pid, waited_pid) self.assertEquals(0, status) self.assertEquals(mitogen.core.b(''), tf.read()) + os.close(fd) finally: tf.close() @@ -283,7 +284,7 @@ class WriteAllTest(testlib.TestCase): self.func(proc.stdin.fileno(), self.ten_ms_chunk) finally: proc.terminate() - proc.stdout.close() + proc.stdin.close() def test_deadline_exceeded_before_call(self): proc = self.make_proc() @@ -293,7 +294,7 @@ class WriteAllTest(testlib.TestCase): )) finally: proc.terminate() - proc.stdout.close() + proc.stdin.close() def test_deadline_exceeded_during_call(self): proc = self.make_proc() @@ -306,7 +307,7 @@ class WriteAllTest(testlib.TestCase): )) finally: proc.terminate() - proc.stdout.close() + proc.stdin.close() class DisconnectTest(testlib.RouterMixin, testlib.TestCase): diff --git a/tests/testlib.py b/tests/testlib.py index 8d2bd8d9..0f9a405a 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -218,7 +218,7 @@ def log_fd_calls(): real_dup = os.dup def dup(*args): with l: - rc = real_dup(*args) + rv = real_dup(*args) if mypid == os.getpid(): print print '--', args, '->', rv From 3da4b1a4203f330942fe0fde8f60b10287f64c16 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 20:13:11 +0000 Subject: [PATCH 20/35] tests: verify only main/watcher threads exist at teardown --- tests/testlib.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/tests/testlib.py b/tests/testlib.py index 0f9a405a..3846f470 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -281,16 +281,38 @@ class TestCase(unittest2.TestCase): cls._fd_count_before = get_fd_count() super(TestCase, cls).setUpClass() + ALLOWED_THREADS = set([ + 'MainThread', + 'mitogen.master.join_thread_async' + ]) + @classmethod - def tearDownClass(cls): - super(TestCase, cls).tearDownClass() - mitogen.fork.on_fork() + def _teardown_check_threads(cls): + counts = {} + for thread in threading.enumerate(): + assert thread.name in cls.ALLOWED_THREADS, \ + 'Found thread %r still running after tests.' % (thread.name,) + counts[thread.name] = counts.get(thread.name, 0) + 1 + + for name in counts: + assert counts[name] == 1, \ + 'Found %d copies of thread %r running after tests.' % (name,) + + @classmethod + def _teardown_check_fds(cls): + mitogen.core.Latch._on_fork() if get_fd_count() != cls._fd_count_before: import os; os.system('lsof -p %s' % (os.getpid(),)) assert 0, "%s leaked FDs. Count before: %s, after: %s" % ( cls, cls._fd_count_before, get_fd_count(), ) + @classmethod + def tearDownClass(cls): + super(TestCase, cls).tearDownClass() + cls._teardown_check_threads() + cls._teardown_check_fds() + def assertRaises(self, exc, func, *args, **kwargs): """Like regular assertRaises, except return the exception that was raised. Can't use context manager because tests must run on Python2.4""" From 175fc377d267cd8308cebfbb2b6e02985bd51f8e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 20:38:39 +0000 Subject: [PATCH 21/35] tests: remove hard-wired SSL paths from fork_test. --- tests/fork_test.py | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/tests/fork_test.py b/tests/fork_test.py index 5e457c97..dd214bd1 100644 --- a/tests/fork_test.py +++ b/tests/fork_test.py @@ -1,4 +1,5 @@ +import _ssl import ctypes import os import random @@ -13,21 +14,29 @@ import testlib import plain_old_module -IS_64BIT = struct.calcsize('P') == 8 -PLATFORM_TO_PATH = { - ('darwin', False): '/usr/lib/libssl.dylib', - ('darwin', True): '/usr/lib/libssl.dylib', - ('linux2', False): '/usr/lib/libssl.so', - ('linux2', True): '/usr/lib/x86_64-linux-gnu/libssl.so', - # Python 2.6 - ('linux3', False): '/usr/lib/libssl.so', - ('linux3', True): '/usr/lib/x86_64-linux-gnu/libssl.so', - # Python 3 - ('linux', False): '/usr/lib/libssl.so', - ('linux', True): '/usr/lib/x86_64-linux-gnu/libssl.so', -} - -c_ssl = ctypes.CDLL(PLATFORM_TO_PATH[sys.platform, IS_64BIT]) +def _find_ssl_linux(): + s = testlib.subprocess__check_output(['ldd', _ssl.__file__]) + for line in s.splitlines(): + bits = line.split() + if bits[0].startswith('libssl'): + return bits[2] + +def _find_ssl_darwin(): + s = testlib.subprocess__check_output(['otool', '-l', _ssl.__file__]) + for line in s.splitlines(): + bits = line.split() + if bits[0] == 'name' and 'libssl' in bits[1]: + return bits[1] + + +if sys.platform.startswith('linux'): + LIBSSL_PATH = _find_ssl_linux() +elif sys.platform == 'darwin': + LIBSSL_PATH = _find_ssl_darwin() +else: + assert 0, "Don't know how to find libssl on this platform" + +c_ssl = ctypes.CDLL(LIBSSL_PATH) c_ssl.RAND_pseudo_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int] c_ssl.RAND_pseudo_bytes.restype = ctypes.c_int From 10af266678f16e7404e025b727934a975b664863 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 20:54:05 +0000 Subject: [PATCH 22/35] issue #406: attempt Broker cleanup in case of a crash. --- mitogen/core.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index df4e54eb..5b1d5298 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2473,6 +2473,13 @@ class Broker(object): for (side, func) in self.poller.poll(timeout): self._call(side.stream, func) + def _broker_exit(self): + for _, (side, _) in self.poller.readers + self.poller.writers: + LOG.error('_broker_main() force disconnecting %r', side) + side.stream.on_disconnect(self) + + self.poller.close() + def _broker_shutdown(self): for _, (side, _) in self.poller.readers + self.poller.writers: self._call(side.stream, side.stream.on_shutdown) @@ -2487,12 +2494,6 @@ class Broker(object): 'more child processes still connected to ' 'our stdout/stderr pipes.', self) - for _, (side, _) in self.poller.readers + self.poller.writers: - LOG.error('_broker_main() force disconnecting %r', side) - side.stream.on_disconnect(self) - - self.poller.close() - def _broker_main(self): """ Handle events until :meth:`shutdown`. On shutdown, invoke @@ -2509,6 +2510,7 @@ class Broker(object): except Exception: LOG.exception('_broker_main() crashed') + self._broker_exit() fire(self, 'exit') def shutdown(self): From 802efa6ea656cbe9f762dc80ada37f09332862f6 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Nov 2018 20:55:30 +0000 Subject: [PATCH 23/35] issue #406: ensure broker_test waits for broker exit. --- tests/broker_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/broker_test.py b/tests/broker_test.py index c35e6161..7890b0f3 100644 --- a/tests/broker_test.py +++ b/tests/broker_test.py @@ -32,6 +32,7 @@ class DeferSyncTest(testlib.TestCase): self.assertEquals(th, broker._thread) finally: broker.shutdown() + broker.join() def test_exception(self): broker = self.klass() @@ -40,6 +41,7 @@ class DeferSyncTest(testlib.TestCase): broker.defer_sync, lambda: int('dave')) finally: broker.shutdown() + broker.join() if __name__ == '__main__': From eae1bdba4e929bca59a8cd934d3e054d44feba6f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 01:48:04 +0000 Subject: [PATCH 24/35] tests: make minify_test print something useful on failure --- tests/minify_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/minify_test.py b/tests/minify_test.py index e990fb90..d1161c90 100644 --- a/tests/minify_test.py +++ b/tests/minify_test.py @@ -95,7 +95,11 @@ class MitogenCoreTest(testlib.TestCase): def test_minify_all(self): for name in glob.glob('mitogen/*.py'): original = self.read_source(name) - minified = self.func(original) + try: + minified = self.func(original) + except Exception: + print('file was: ' + name) + raise self._test_syntax_valid(minified, name) self._test_line_counts_match(original, minified) From b3841317dd89ffb8d015929318c484a8e2ef7f41 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 01:49:10 +0000 Subject: [PATCH 25/35] issue #406: clean up FDs on failure explicitly The previous approach was crap since it left e.g. socketpair instances lying around for GC with their underlying FD already closed, coupled with FD number reuse, led to random madness when GC finally runs. --- mitogen/parent.py | 79 ++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/mitogen/parent.py b/mitogen/parent.py index 019ee917..e36167d4 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -211,7 +211,7 @@ def create_socketpair(): return parentfp, childfp -def detach_popen(close_on_error=None, **kwargs): +def detach_popen(**kwargs): """ Use :class:`subprocess.Popen` to construct a child process, then hack the Popen so that it forgets the child it created, allowing it to survive a @@ -232,13 +232,7 @@ def detach_popen(close_on_error=None, **kwargs): # handling, without tying the surrounding code into managing a Popen # object, which isn't possible for at least :mod:`mitogen.fork`. This # should be replaced by a swappable helper class in a future version. - try: - proc = subprocess.Popen(**kwargs) - except Exception: - for fd in close_on_error or (): - os.close(fd) - raise - + proc = subprocess.Popen(**kwargs) proc._child_created = False return proc.pid @@ -279,15 +273,20 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None): mitogen.core.set_cloexec(stderr_w) extra = {'stderr': stderr_w} - pid = detach_popen( - args=args, - stdin=childfp, - stdout=childfp, - close_fds=True, - preexec_fn=preexec_fn, - close_on_error=[parentfp.fileno(), childfp.fileno()], - **extra - ) + try: + pid = detach_popen( + args=args, + stdin=childfp, + stdout=childfp, + close_fds=True, + preexec_fn=preexec_fn, + **extra + ) + except Exception: + childfp.close() + parentfp.close() + raise + if stderr_pipe: os.close(stderr_w) childfp.close() @@ -347,15 +346,19 @@ def tty_create_child(args): disable_echo(master_fd) disable_echo(slave_fd) - pid = detach_popen( - args=args, - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - preexec_fn=_acquire_controlling_tty, - close_on_error=[master_fd, slave_fd], - close_fds=True, - ) + try: + pid = detach_popen( + args=args, + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + preexec_fn=_acquire_controlling_tty, + close_fds=True, + ) + except Exception: + os.close(master_fd) + os.close(slave_fd) + raise os.close(slave_fd) LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', @@ -383,15 +386,21 @@ def hybrid_tty_create_child(args): disable_echo(master_fd) disable_echo(slave_fd) - pid = detach_popen( - args=args, - stdin=childfp, - stdout=childfp, - stderr=slave_fd, - preexec_fn=_acquire_controlling_tty, - close_fds=True, - close_on_error=[master_fd, slave_fd, parentfp.fileno(), childfp.fileno()], - ) + try: + pid = detach_popen( + args=args, + stdin=childfp, + stdout=childfp, + stderr=slave_fd, + preexec_fn=_acquire_controlling_tty, + close_fds=True, + ) + except Exception: + os.close(master_fd) + os.close(slave_fd) + parentfp.close() + childfp.close() + raise os.close(slave_fd) childfp.close() From 17631b0573092807d29bf7c4e4cdfa4c713c64cf Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 01:50:19 +0000 Subject: [PATCH 26/35] issue #406: parent: close extra_fd on failure too. --- mitogen/parent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mitogen/parent.py b/mitogen/parent.py index e36167d4..f2dacb09 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1169,6 +1169,8 @@ class Stream(mitogen.core.Stream): except Exception: self.receive_side.close() self.transmit_side.close() + if extra_fd is not None: + os.close(extra_fd) self._reap_child() raise From 003526ef7bd0b4ca65e35df35e0d725709f69981 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 01:54:50 +0000 Subject: [PATCH 27/35] issue #406: fix thread leaks in unix_test too. --- tests/unix_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unix_test.py b/tests/unix_test.py index f837c6f0..ee9499ba 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -87,6 +87,8 @@ class ClientTest(testlib.TestCase): resp = context.call_service(service_name=MyService, method_name='ping') self.assertEquals(mitogen.context_id, resp['src_id']) self.assertEquals(0, resp['auth_id']) + router.broker.shutdown() + router.broker.join() def _test_simple_server(self, path): router = mitogen.master.Router() @@ -102,7 +104,9 @@ class ClientTest(testlib.TestCase): time.sleep(0.1) finally: pool.shutdown() + pool.join() router.broker.shutdown() + router.broker.join() finally: os._exit(0) From dc3db49c5a24ed5b251d4a4a7a0d5e37549e8af3 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 02:35:56 +0000 Subject: [PATCH 28/35] issue #406: more leaked FDs when create_child() fails. --- mitogen/parent.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mitogen/parent.py b/mitogen/parent.py index f2dacb09..556f38c6 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -285,6 +285,9 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None): except Exception: childfp.close() parentfp.close() + if stderr_pipe: + os.close(stderr_r) + os.close(stderr_w) raise if stderr_pipe: From 411af6c1675330214b00baeef32614e366196621 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 11:25:22 +0000 Subject: [PATCH 29/35] issue #406: unix: don't leak already-closed socket object if Side.close() closes the socket (which it does), and it gets reused, GC will cause socketobject.__del__ to later delete some random FD. --- mitogen/unix.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mitogen/unix.py b/mitogen/unix.py index 417842bc..cc2d92ff 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -78,6 +78,11 @@ class Listener(mitogen.core.BasicStream): self.receive_side = mitogen.core.Side(self, self._sock.fileno()) router.broker.start_receive(self) + def on_shutdown(self, broker): + self._sock.close() + self.receive_side.closed = True + broker.stop_receive(self) + def _accept_client(self, sock): sock.setblocking(True) try: From 661e274556c874d5652c15ab58a4f1c6fa6e68d9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 11:49:24 +0000 Subject: [PATCH 30/35] issue #406: ensure is_path_dead() socket is finalized. --- mitogen/unix.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mitogen/unix.py b/mitogen/unix.py index cc2d92ff..e691fe71 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -49,10 +49,13 @@ from mitogen.core import LOG def is_path_dead(path): s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - s.connect(path) - except socket.error: - e = sys.exc_info()[1] - return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) + try: + s.connect(path) + except socket.error: + e = sys.exc_info()[1] + return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) + finally: + s.close() return False From 586c6aca9a0685603c8e71c42b2e3f440cd43bd8 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 11:52:02 +0000 Subject: [PATCH 31/35] issue #406: unix: fix ordering of stop_receive/close. --- mitogen/unix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitogen/unix.py b/mitogen/unix.py index e691fe71..12182a28 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -82,9 +82,9 @@ class Listener(mitogen.core.BasicStream): router.broker.start_receive(self) def on_shutdown(self, broker): + broker.stop_receive(self) self._sock.close() self.receive_side.closed = True - broker.stop_receive(self) def _accept_client(self, sock): sock.setblocking(True) From e01c8f2891dc07e78d1c5d0fa8961c18cc23c598 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 11:57:50 +0000 Subject: [PATCH 32/35] issue #406: 3.x syntax fixes. --- tests/testlib.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/tests/testlib.py b/tests/testlib.py index 3846f470..2f3c2b2e 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -10,7 +10,6 @@ import threading import time import traceback -import psutil import unittest2 import mitogen.core @@ -49,6 +48,7 @@ def get_fd_count(): """ Return the number of FDs open by this process. """ + import psutil return psutil.Process().num_fds() @@ -185,10 +185,9 @@ def log_fd_calls(): with l: rv = real_pipe() if mypid == os.getpid(): - print - print rv + sys.stdout.write('\n%s\n' % (rv,)) traceback.print_stack(limit=3) - print + sys.stdout.write('\n') return rv os.pipe = pipe @@ -197,10 +196,9 @@ def log_fd_calls(): with l: rv = real_socketpair(*args) if mypid == os.getpid(): - print - print '--', args, '->', rv + sys.stdout.write('\n%s -> %s\n' % (args, rv)) traceback.print_stack(limit=3) - print + sys.stdout.write('\n') return rv socket.socketpair = socketpair @@ -209,10 +207,9 @@ def log_fd_calls(): with l: real_dup2(*args) if mypid == os.getpid(): - print - print '--', args + sys.stdout.write('\n%s\n' % (args,)) traceback.print_stack(limit=3) - print + sys.stdout.write('\n') os.dup2 = dup2 real_dup = os.dup @@ -220,10 +217,9 @@ def log_fd_calls(): with l: rv = real_dup(*args) if mypid == os.getpid(): - print - print '--', args, '->', rv + sys.stdout.write('\n%s -> %s\n' % (args, rv)) traceback.print_stack(limit=3) - print + sys.stdout.write('\n') return rv os.dup = dup From 802de6a8d585fbc24434a993aa0e2bba02920ce1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 13:48:34 +0000 Subject: [PATCH 33/35] issue #406: clean up DiagLogStream handling and connect() failure. When Stream.connect() fails, have it just use on_disconnect(). Now there is a single disconnect cleanup path. Remove cutpasted DiagLogStream setup/destruction, and move it into the base class (temporarily), and only manage the lifetime of its underlying FD via Side.close(). This cures another EBADF failure. --- mitogen/doas.py | 18 ++------- mitogen/fork.py | 2 +- mitogen/parent.py | 70 ++++++++++++++++++++++------------- mitogen/ssh.py | 25 +++---------- mitogen/su.py | 5 +-- mitogen/sudo.py | 12 +++--- tests/data/stubs/stub-doas.py | 7 +++- tests/data/stubs/stub-sudo.py | 7 +++- 8 files changed, 73 insertions(+), 73 deletions(-) diff --git a/mitogen/doas.py b/mitogen/doas.py index cdcee0b0..09b2be9e 100644 --- a/mitogen/doas.py +++ b/mitogen/doas.py @@ -45,10 +45,6 @@ class Stream(mitogen.parent.Stream): create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) child_is_immediate_subprocess = False - #: Once connected, points to the corresponding DiagLogStream, allowing it - #: to be disconnected at the same time this stream is being torn down. - tty_stream = None - username = 'root' password = None doas_path = 'doas' @@ -75,10 +71,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'doas.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): bits = [self.doas_path, '-u', self.username, '--'] bits = bits + super(Stream, self).get_boot_command() @@ -88,15 +80,13 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'doas password is incorrect' password_required_msg = 'doas password is required' - def _connect_bootstrap(self, extra_fd): - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) - - password_sent = False + def _connect_bootstrap(self): it = mitogen.parent.iter_read( - fds=[self.receive_side.fd, extra_fd], + fds=[self.receive_side.fd, self.diag_stream.receive_side.fd], deadline=self.connect_deadline, ) + password_sent = False for buf in it: LOG.debug('%r: received %r', self, buf) if buf.endswith(self.EC0_MARKER): @@ -111,7 +101,7 @@ class Stream(mitogen.parent.Stream): if password_sent: raise PasswordError(self.password_incorrect_msg) LOG.debug('sending password') - self.tty_stream.transmit_side.write( + self.diag_stream.transmit_side.write( mitogen.core.to_text(self.password + '\n').encode('utf-8') ) password_sent = True diff --git a/mitogen/fork.py b/mitogen/fork.py index cf769788..3e3a98a9 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -188,6 +188,6 @@ class Stream(mitogen.parent.Stream): # Don't trigger atexit handlers, they were copied from the parent. os._exit(0) - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): # None required. pass diff --git a/mitogen/parent.py b/mitogen/parent.py index 556f38c6..c4e6f621 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -939,6 +939,33 @@ class Stream(mitogen.core.Stream): #: ExternalContext.main(). max_message_size = None + #: If :attr:`create_child` supplied a diag_fd, references the corresponding + #: :class:`DiagLogStream`, allowing it to be disconnected when this stream + #: is disconnected. Set to :data:`None` if no `diag_fd` was present. + diag_stream = None + + #: Function with the semantics of :func:`create_child` used to create the + #: child process. + create_child = staticmethod(create_child) + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + create_child_args = {} + + #: :data:`True` if the remote has indicated that it intends to detach, and + #: should not be killed on disconnect. + detached = False + + #: If :data:`True`, indicates the child should not be killed during + #: graceful detachment, as it the actual process implementing the child + #: context. In all other cases, the subprocess is SSH, sudo, or a similar + #: tool that should be reminded to quit during disconnection. + child_is_immediate_subprocess = True + + #: Prefix given to default names generated by :meth:`connect`. + name_prefix = u'local' + + _reaped = False + def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) @@ -976,15 +1003,6 @@ class Stream(mitogen.core.Stream): ) ) - #: If :data:`True`, indicates the subprocess managed by us should not be - #: killed during graceful detachment, as it the actual process implementing - #: the child context. In all other cases, the subprocess is SSH, sudo, or a - #: similar tool that should be reminded to quit during disconnection. - child_is_immediate_subprocess = True - - detached = False - _reaped = False - def _reap_child(self): """ Reap the child process during disconnection. @@ -1024,8 +1042,10 @@ class Stream(mitogen.core.Stream): raise def on_disconnect(self, broker): - self._reap_child() super(Stream, self).on_disconnect(broker) + if self.diag_stream is not None: + self.diag_stream.on_disconnect(broker) + self._reap_child() # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # file descriptor 0 as 100, creates a pipe, then execs a new interpreter @@ -1129,10 +1149,6 @@ class Stream(mitogen.core.Stream): ) return zlib.compress(source.encode('utf-8'), 9) - create_child = staticmethod(create_child) - create_child_args = {} - name_prefix = u'local' - def start_child(self): args = self.get_boot_command() try: @@ -1154,26 +1170,28 @@ class Stream(mitogen.core.Stream): def connect(self): LOG.debug('%r.connect()', self) - self.pid, fd, extra_fd = self.start_child() + self.pid, fd, diag_fd = self.start_child() self.name = u'%s.%s' % (self.name_prefix, self.pid) self.receive_side = mitogen.core.Side(self, fd) self.transmit_side = mitogen.core.Side(self, os.dup(fd)) - LOG.debug('%r.connect(): child process stdin/stdout=%r', - self, self.receive_side.fd) + if diag_fd is not None: + self.diag_stream = DiagLogStream(diag_fd, self) + else: + self.diag_stream = None + + LOG.debug('%r.connect(): stdin=%r, stdout=%r, diag=%r', + self, self.receive_side.fd, self.transmit_side.fd, + self.diag_stream and self.diag_stream.receive_side.fd) try: - self._connect_bootstrap(extra_fd) + self._connect_bootstrap() except EofError: - self.receive_side.close() - self.transmit_side.close() + self.on_disconnect(self._router.broker) e = sys.exc_info()[1] self._adorn_eof_error(e) raise except Exception: - self.receive_side.close() - self.transmit_side.close() - if extra_fd is not None: - os.close(extra_fd) + self.on_disconnect(self._router.broker) self._reap_child() raise @@ -1188,8 +1206,10 @@ class Stream(mitogen.core.Stream): write_all(self.transmit_side.fd, self.get_preamble()) discard_until(self.receive_side.fd, self.EC1_MARKER, self.connect_deadline) + if self.diag_stream: + self._router.broker.start_receive(self.diag_stream) - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): discard_until(self.receive_side.fd, self.EC0_MARKER, self.connect_deadline) self._ec0_received() diff --git a/mitogen/ssh.py b/mitogen/ssh.py index fba6e8f2..e3891f9c 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -127,10 +127,6 @@ class Stream(mitogen.parent.Stream): #: Number of -v invocations to pass on command line. ssh_debug_level = 0 - #: If batch_mode=False, points to the corresponding DiagLogStream, allowing - #: it to be disconnected at the same time this stream is being torn down. - tty_stream = None - #: The path to the SSH binary. ssh_path = 'ssh' @@ -195,11 +191,6 @@ class Stream(mitogen.parent.Stream): 'stderr_pipe': True, } - def on_disconnect(self, broker): - if self.tty_stream is not None: - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): bits = [self.ssh_path] if self.ssh_debug_level: @@ -265,7 +256,7 @@ class Stream(mitogen.parent.Stream): def _host_key_prompt(self): if self.check_host_keys == 'accept': LOG.debug('%r: accepting host key', self) - self.tty_stream.transmit_side.write(b('yes\n')) + self.diag_stream.transmit_side.write(b('yes\n')) return # _host_key_prompt() should never be reached with ignore or enforce @@ -273,16 +264,10 @@ class Stream(mitogen.parent.Stream): # with ours. raise HostKeyError(self.hostkey_config_msg) - def _ec0_received(self): - if self.tty_stream is not None: - self._router.broker.start_receive(self.tty_stream) - return super(Stream, self)._ec0_received() - - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): fds = [self.receive_side.fd] - if extra_fd is not None: - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) - fds.append(extra_fd) + if self.diag_stream is not None: + fds.append(self.diag_stream.receive_side.fd) it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline) @@ -311,7 +296,7 @@ class Stream(mitogen.parent.Stream): if self.password is None: raise PasswordError(self.password_required_msg) LOG.debug('%r: sending password', self) - self.tty_stream.transmit_side.write( + self.diag_stream.transmit_side.write( (self.password + '\n').encode() ) password_sent = True diff --git a/mitogen/su.py b/mitogen/su.py index 7e2e5f08..9b0172c8 100644 --- a/mitogen/su.py +++ b/mitogen/su.py @@ -80,9 +80,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'su.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): argv = mitogen.parent.Argv(super(Stream, self).get_boot_command()) return [self.su_path, self.username, '-c', str(argv)] @@ -90,7 +87,7 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'su password is incorrect' password_required_msg = 'su password is required' - def _connect_bootstrap(self, extra_fd): + def _connect_bootstrap(self): password_sent = False it = mitogen.parent.iter_read( fds=[self.receive_side.fd], diff --git a/mitogen/sudo.py b/mitogen/sudo.py index 84b81ddc..b2eaabce 100644 --- a/mitogen/sudo.py +++ b/mitogen/sudo.py @@ -150,10 +150,6 @@ class Stream(mitogen.parent.Stream): super(Stream, self).connect() self.name = u'sudo.' + mitogen.core.to_text(self.username) - def on_disconnect(self, broker): - self.tty_stream.on_disconnect(broker) - super(Stream, self).on_disconnect(broker) - def get_boot_command(self): # Note: sudo did not introduce long-format option processing until July # 2013, so even though we parse long-format options, supply short-form @@ -177,12 +173,14 @@ class Stream(mitogen.parent.Stream): password_incorrect_msg = 'sudo password is incorrect' password_required_msg = 'sudo password is required' - def _connect_bootstrap(self, extra_fd): - self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) + def _connect_bootstrap(self): + fds = [self.receive_side.fd] + if self.diag_stream is not None: + fds.append(self.diag_stream.receive_side.fd) password_sent = False it = mitogen.parent.iter_read( - fds=[self.receive_side.fd, extra_fd], + fds=fds, deadline=self.connect_deadline, ) diff --git a/tests/data/stubs/stub-doas.py b/tests/data/stubs/stub-doas.py index 08caf044..ca929bc0 100755 --- a/tests/data/stubs/stub-doas.py +++ b/tests/data/stubs/stub-doas.py @@ -2,8 +2,13 @@ import json import os +import subprocess import sys os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['THIS_IS_STUB_DOAS'] = '1' -os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:]) + +# This must be a child process and not exec() since Mitogen replaces its stderr +# descriptor, causing the last user of the slave PTY to close it, resulting in +# the master side indicating EIO. +subprocess.check_call(sys.argv[sys.argv.index('--') + 1:]) diff --git a/tests/data/stubs/stub-sudo.py b/tests/data/stubs/stub-sudo.py index ff88cd8e..a7f2704f 100755 --- a/tests/data/stubs/stub-sudo.py +++ b/tests/data/stubs/stub-sudo.py @@ -2,8 +2,13 @@ import json import os +import subprocess import sys os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['THIS_IS_STUB_SUDO'] = '1' -os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:]) + +# This must be a child process and not exec() since Mitogen replaces its stderr +# descriptor, causing the last user of the slave PTY to close it, resulting in +# the master side indicating EIO. +subprocess.check_call(sys.argv[sys.argv.index('--') + 1:]) From 01e65d78650ffebe9ab28cef06824f1bc34f97a5 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 13:53:43 +0000 Subject: [PATCH 34/35] docs: update Changelog; closes #406. --- docs/changelog.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 781c42b9..e3885b79 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -213,6 +213,10 @@ Core Library * `#406 `_: connections could leak FDs when a child process failed to start. +* `#406 `_: connections could leave + FD wrapper objects that had not been closed lying around to be closed during + garbage collection, causing reused FD numbers to be closed at random moments. + * `#411 `_: the SSH method typed "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` was configured. This would lead to connection timeouts due to the hung From 4ac9cdce7c09b6eda5b180cc81c0a869e1cfdcd5 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Nov 2018 13:54:34 +0000 Subject: [PATCH 35/35] docs: update Changelog; closes #417. --- docs/changelog.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index e3885b79..9dd75d1b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -213,7 +213,8 @@ Core Library * `#406 `_: connections could leak FDs when a child process failed to start. -* `#406 `_: connections could leave +* `#406 `_, + `#417 `_: connections could leave FD wrapper objects that had not been closed lying around to be closed during garbage collection, causing reused FD numbers to be closed at random moments.