diff --git a/docs/api.rst b/docs/api.rst index 844bb900..57b9a655 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -98,479 +98,441 @@ Router Class .. currentmodule:: mitogen.master -.. class:: Router (broker=None) +.. autoclass:: Router (broker=None) + :members: - Extend :class:`mitogen.core.Router` with functionality useful to - masters, and child contexts who later become masters. Currently when this - class is required, the target context's router is upgraded at runtime. - .. note:: +.. _context-factories: + +Connection Methods +================== - You may construct as many routers as desired, and use the same broker - for multiple routers, however usually only one broker and router need - exist. Multiple routers may be useful when dealing with separate trust - domains, for example, manipulating infrastructure belonging to separate - customers or projects. +.. method:: mitogen.parent.Router.fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - :param mitogen.master.Broker broker: - :class:`Broker` instance to use. If not specified, a private - :class:`Broker` is created. + Construct a context on the local machine by forking the current + process. The forked child receives a new identity, sets up a new broker + and router, and responds to function calls identically to children + created using other methods. - .. attribute:: profiling + For long-lived processes, :meth:`local` is always better as it + guarantees a pristine interpreter state that inherited little from the + parent. Forking should only be used in performance-sensitive scenarios + where short-lived children must be spawned to isolate potentially buggy + code, and only after accounting for all the bad things possible as a + result of, at a minimum: - When :data:`True`, cause the broker thread and any subsequent broker - and main threads existing in any child to write - ``/tmp/mitogen.stats...log`` containing a - :mod:`cProfile` dump on graceful exit. Must be set prior to - construction of any :class:`Broker`, e.g. via: + * Files open in the parent remaining open in the child, + causing the lifetime of the underlying object to be extended + indefinitely. - .. code:: + * From the perspective of external components, this is observable + in the form of pipes and sockets that are never closed, which may + break anything relying on closure to signal protocol termination. - mitogen.master.Router.profiling = True + * Descriptors that reference temporary files will not have their disk + space reclaimed until the child exits. - .. method:: enable_debug + * Third party package state, such as urllib3's HTTP connection pool, + attempting to write to file descriptors shared with the parent, + causing random failures in both parent and child. - Cause this context and any descendant child contexts to write debug - logs to /tmp/mitogen..log. + * UNIX signal handlers installed in the parent process remaining active + in the child, despite associated resources, such as service threads, + child processes, resource usage counters or process timers becoming + absent or reset in the child. - .. method:: allocate_id + * Library code that makes assumptions about the process ID remaining + unchanged, for example to implement inter-process locking, or to + generate file names. - Arrange for a unique context ID to be allocated and associated with a - route leading to the active context. In masters, the ID is generated - directly, in children it is forwarded to the master via an - ``ALLOCATE_ID`` message that causes the master to emit matching - ``ADD_ROUTE`` messages prior to replying. + * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement + doubles as either parent or child dirties their pages. + + * File-backed memory mappings that cannot have their space freed on + disk due to the mapping living on in the child. + + * Difficult to diagnose memory usage and latency spikes due to object + graphs becoming unreferenced in either parent or child, causing + immediate copy-on-write to large portions of the process heap. + + * Locks held in the parent causing random deadlocks in the child, such + as when another thread emits a log entry via the :mod:`logging` + package concurrent to another thread calling :meth:`fork`. + + * Objects existing in Thread-Local Storage of every non-:meth:`fork` + thread becoming permanently inaccessible, and never having their + object destructors called, including TLS usage by native extension + code, triggering many new variants of all the issues above. - .. _context-factories: + * Pseudo-Random Number Generator state that is easily observable by + network peers to be duplicate, violating requirements of + cryptographic protocols through one-time state reuse. In the worst + case, children continually reuse the same state due to repeatedly + forking from a static parent. - **Context Factories** - - .. method:: fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None) - - Construct a context on the local machine by forking the current - process. The forked child receives a new identity, sets up a new broker - and router, and responds to function calls identically to children - created using other methods. - - For long-lived processes, :meth:`local` is always better as it - guarantees a pristine interpreter state that inherited little from the - parent. Forking should only be used in performance-sensitive scenarios - where short-lived children must be spawned to isolate potentially buggy - code, and only after accounting for all the bad things possible as a - result of, at a minimum: - - * Files open in the parent remaining open in the child, - causing the lifetime of the underlying object to be extended - indefinitely. - - * From the perspective of external components, this is observable - in the form of pipes and sockets that are never closed, which may - break anything relying on closure to signal protocol termination. - - * Descriptors that reference temporary files will not have their disk - space reclaimed until the child exits. - - * Third party package state, such as urllib3's HTTP connection pool, - attempting to write to file descriptors shared with the parent, - causing random failures in both parent and child. - - * UNIX signal handlers installed in the parent process remaining active - in the child, despite associated resources, such as service threads, - child processes, resource usage counters or process timers becoming - absent or reset in the child. - - * Library code that makes assumptions about the process ID remaining - unchanged, for example to implement inter-process locking, or to - generate file names. - - * Anonymous ``MAP_PRIVATE`` memory mappings whose storage requirement - doubles as either parent or child dirties their pages. - - * File-backed memory mappings that cannot have their space freed on - disk due to the mapping living on in the child. - - * Difficult to diagnose memory usage and latency spikes due to object - graphs becoming unreferenced in either parent or child, causing - immediate copy-on-write to large portions of the process heap. - - * Locks held in the parent causing random deadlocks in the child, such - as when another thread emits a log entry via the :mod:`logging` - package concurrent to another thread calling :meth:`fork`. - - * Objects existing in Thread-Local Storage of every non-:meth:`fork` - thread becoming permanently inaccessible, and never having their - object destructors called, including TLS usage by native extension - code, triggering many new variants of all the issues above. - - * Pseudo-Random Number Generator state that is easily observable by - network peers to be duplicate, violating requirements of - cryptographic protocols through one-time state reuse. In the worst - case, children continually reuse the same state due to repeatedly - forking from a static parent. - - :meth:`fork` cleans up Mitogen-internal objects, in addition to - locks held by the :mod:`logging` package, reseeds - :func:`random.random`, and the OpenSSL PRNG via - :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is - already loaded. You must arrange for your program's state, including - any third party packages in use, to be cleaned up by specifying an - `on_fork` function. - - The associated stream implementation is - :class:`mitogen.fork.Stream`. - - :param function on_fork: - Function invoked as `on_fork()` from within the child process. This - permits supplying a program-specific cleanup function to break - locks and close file descriptors belonging to the parent from - within the child. - - :param function on_start: - Invoked as `on_start(econtext)` from within the child process after - it has been set up, but before the function dispatch loop starts. - This permits supplying a custom child main function that inherits - rich data structures that cannot normally be passed via a - serialization. - - :param mitogen.core.Context via: - Same as the `via` parameter for :meth:`local`. - - :param bool debug: - Same as the `debug` parameter for :meth:`local`. - - :param bool profiling: - Same as the `profiling` parameter for :meth:`local`. - - .. method:: local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) - - Construct a context on the local machine as a subprocess of the current - process. The associated stream implementation is - :class:`mitogen.master.Stream`. - - :param str remote_name: - The ``argv[0]`` suffix for the new process. If `remote_name` is - ``test``, the new process ``argv[0]`` will be ``mitogen:test``. - - If unspecified, defaults to ``@:``. - - This variable cannot contain slash characters, as the resulting - ``argv[0]`` must be presented in such a way as to allow Python to - determine its installation prefix. This is required to support - virtualenv. - - :param str|list python_path: - String or list path to the Python interpreter to use for bootstrap. - Defaults to :data:`sys.executable` for local connections, and - ``python`` for remote connections. - - It is possible to pass a list to invoke Python wrapped using - another tool, such as ``["/usr/bin/env", "python"]``. - - :param bool debug: - If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to - be enabled in the new context. Automatically :data:`True` when - :meth:`enable_debug` has been called, but may be used - selectively otherwise. - - :param bool unidirectional: - If :data:`True`, arrange for the child's router to be constructed - with :attr:`unidirectional routing - ` enabled. Automatically - :data:`True` when it was enabled for this router, but may still be - explicitly set to :data:`False`. - - :param float connect_timeout: - Fractional seconds to wait for the subprocess to indicate it is - healthy. Defaults to 30 seconds. - - :param bool profiling: - If :data:`True`, arrange for profiling (:data:`profiling`) to be - enabled in the new context. Automatically :data:`True` when - :data:`profiling` is :data:`True`, but may be used selectively - otherwise. - - :param mitogen.core.Context via: - If not :data:`None`, arrange for construction to occur via RPCs - made to the context `via`, and for :data:`ADD_ROUTE - ` messages to be generated as appropriate. - - .. code-block:: python - - # SSH to the remote machine. - remote_machine = router.ssh(hostname='mybox.com') - - # Use the SSH connection to create a sudo connection. - remote_root = router.sudo(username='root', via=remote_machine) - - .. method:: doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``doas`` invocation. - The ``doas`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to use, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str doas_path: - Filename or complete path to the ``doas`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``doas``. - :param bytes password_prompt: - A string that indicates ``doas`` is requesting a password. Defaults - to ``Password:``. - :param list incorrect_prompts: - List of bytestrings indicating the password is incorrect. Defaults - to `(b"doas: authentication failed")`. - :raises mitogen.doas.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or the target account did not exist. - - .. method:: docker (container=None, image=None, docker_path=None, \**kwargs) - - Construct a context on the local machine within an existing or - temporary new Docker container using the ``docker`` program. One of - `container` or `image` must be specified. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which Docker interprets as ``root``. - :param str image: - Image tag to use to construct a temporary container. Defaults to - :data:`None`. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - - .. method:: jail (container, jexec_path=None, \**kwargs) - - Construct a context on the local machine within a FreeBSD jail using - the ``jexec`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - :data:`None`, which ``jexec`` interprets as ``root``. - :param str jexec_path: - Filename or complete path to the ``jexec`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``/usr/sbin/jexec``. - - .. method:: kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) - - Construct a context in a container via the Kubernetes ``kubectl`` - program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str pod: - Kubernetes pod to connect to. - :param str kubectl_path: - Filename or complete path to the ``kubectl`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``kubectl``. - :param list kubectl_args: - Additional arguments to pass to the ``kubectl`` command. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within an LXC classic - container using the ``lxc-attach`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_attach_path: - Filename or complete path to the ``lxc-attach`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``lxc-attach``. - - .. method:: lxc (container, lxc_attach_path=None, \**kwargs) - - Construct a context on the local machine within a LXD container using - the ``lxc`` program. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str container: - Existing container to connect to. Defaults to :data:`None`. - :param str lxc_path: - Filename or complete path to the ``lxc`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``lxc``. - - .. method:: setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) - - Construct a context in the style of :meth:`local`, but change the - active Linux process namespaces via calls to `setns(1)` before - executing Python. - - The namespaces to use, and the active root file system are taken from - the root PID of a running Docker, LXC, LXD, or systemd-nspawn - container. - - A program is required only to find the root PID, after which management - of the child Python interpreter is handled directly. - - :param str container: - Container to connect to. - :param str kind: - One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. - :param str username: - Username within the container to :func:`setuid` to. Defaults to - ``root``. - :param str docker_path: - Filename or complete path to the Docker binary. ``PATH`` will be - searched if given as a filename. Defaults to ``docker``. - :param str lxc_path: - Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will - be searched if given as a filename. Defaults to ``lxc``. - :param str lxc_info_path: - Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` - will be searched if given as a filename. Defaults to ``lxc-info``. - :param str machinectl_path: - Filename or complete path to the ``machinectl`` binary. ``PATH`` - will be searched if given as a filename. Defaults to - ``machinectl``. - - .. method:: su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) - - Construct a context on the local machine over a ``su`` invocation. The - ``su`` process is started in a newly allocated pseudo-terminal, and - supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to ``su``, defaults to ``root``. - :param str password: - The account password to use if requested. - :param str su_path: - Filename or complete path to the ``su`` binary. ``PATH`` will be - searched if given as a filename. Defaults to ``su``. - :param bytes password_prompt: - The string that indicates ``su`` is requesting a password. Defaults - to ``Password:``. - :param str incorrect_prompts: - Strings that signal the password is incorrect. Defaults to `("su: - sorry", "su: authentication failure")`. - - :raises mitogen.su.PasswordError: - A password was requested but none was provided, the supplied - password was incorrect, or (on BSD) the target account did not - exist. - - .. method:: sudo (username=None, sudo_path=None, password=None, \**kwargs) - - Construct a context on the local machine over a ``sudo`` invocation. - The ``sudo`` process is started in a newly allocated pseudo-terminal, - and supports typing interactive passwords. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - Username to pass to sudo as the ``-u`` parameter, defaults to - ``root``. - :param str sudo_path: - Filename or complete path to the sudo binary. ``PATH`` will be - searched if given as a filename. Defaults to ``sudo``. - :param str password: - The password to use if/when sudo requests it. Depending on the sudo - configuration, this is either the current account password or the - target account password. :class:`mitogen.sudo.PasswordError` - will be raised if sudo requests a password but none is provided. - :param bool set_home: - If :data:`True`, request ``sudo`` set the ``HOME`` environment - variable to match the target UNIX account. - :param bool preserve_env: - If :data:`True`, request ``sudo`` to preserve the environment of - the parent process. - :param str selinux_type: - If not :data:`None`, the SELinux security context to use. - :param str selinux_role: - If not :data:`None`, the SELinux role to use. - :param list sudo_args: - Arguments in the style of :data:`sys.argv` that would normally - be passed to ``sudo``. The arguments are parsed in-process to set - equivalent parameters. Re-parsing ensures unsupported options cause - :class:`mitogen.core.StreamError` to be raised, and that - attributes of the stream match the actual behaviour of ``sudo``. - - .. method:: ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) - - Construct a remote context over an OpenSSH ``ssh`` invocation. - - The ``ssh`` process is started in a newly allocated pseudo-terminal to - support typing interactive passwords and responding to prompts, if a - password is specified, or `check_host_keys=accept`. In other scenarios, - ``BatchMode`` is enabled and no PTY is allocated. For many-target - configurations, both options should be avoided as most systems have a - conservative limit on the number of pseudo-terminals that may exist. - - Accepts all parameters accepted by :meth:`local`, in addition to: - - :param str username: - The SSH username; default is unspecified, which causes SSH to pick - the username to use. - :param str ssh_path: - Absolute or relative path to ``ssh``. Defaults to ``ssh``. - :param list ssh_args: - Additional arguments to pass to the SSH command. - :param int port: - Port number to connect to; default is unspecified, which causes SSH - to pick the port number. - :param str check_host_keys: - Specifies the SSH host key checking mode. Defaults to ``enforce``. - - * ``ignore``: no host key checking is performed. Connections never - fail due to an unknown or changed host key. - * ``accept``: known hosts keys are checked to ensure they match, - new host keys are automatically accepted and verified in future - connections. - * ``enforce``: known host keys are checked to ensure they match, - unknown hosts cause a connection failure. - :param str password: - Password to type if/when ``ssh`` requests it. If not specified and - a password is requested, :class:`mitogen.ssh.PasswordError` is - raised. - :param str identity_file: - Path to an SSH private key file to use for authentication. Default - is unspecified, which causes SSH to pick the identity file. - - When this option is specified, only `identity_file` will be used by - the SSH client to perform authenticaion; agent authentication is - automatically disabled, as is reading the default private key from - ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. - :param bool identities_only: - If :data:`True` and a password or explicit identity file is - specified, instruct the SSH client to disable any authentication - identities inherited from the surrounding environment, such as - those loaded in any running ``ssh-agent``, or default key files - present in ``~/.ssh``. This ensures authentication attempts only - occur using the supplied password or SSH key. - :param bool compression: - If :data:`True`, enable ``ssh`` compression support. Compression - has a minimal effect on the size of modules transmitted, as they - are already compressed, however it has a large effect on every - remaining message in the otherwise uncompressed stream protocol, - such as function call arguments and return values. - :param int ssh_debug_level: - Optional integer `0..3` indicating the SSH client debug level. - :raises mitogen.ssh.PasswordError: - A password was requested but none was specified, or the specified - password was incorrect. - - :raises mitogen.ssh.HostKeyError: - When `check_host_keys` is set to either ``accept``, indicates a - previously recorded key no longer matches the remote machine. When - set to ``enforce``, as above, but additionally indicates no - previously recorded key exists for the remote machine. + :meth:`fork` cleans up Mitogen-internal objects, in addition to + locks held by the :mod:`logging` package, reseeds + :func:`random.random`, and the OpenSSL PRNG via + :func:`ssl.RAND_add`, but only if the :mod:`ssl` module is + already loaded. You must arrange for your program's state, including + any third party packages in use, to be cleaned up by specifying an + `on_fork` function. + + The associated stream implementation is + :class:`mitogen.fork.Stream`. + + :param function on_fork: + Function invoked as `on_fork()` from within the child process. This + permits supplying a program-specific cleanup function to break + locks and close file descriptors belonging to the parent from + within the child. + + :param function on_start: + Invoked as `on_start(econtext)` from within the child process after + it has been set up, but before the function dispatch loop starts. + This permits supplying a custom child main function that inherits + rich data structures that cannot normally be passed via a + serialization. + + :param mitogen.core.Context via: + Same as the `via` parameter for :meth:`local`. + + :param bool debug: + Same as the `debug` parameter for :meth:`local`. + + :param bool profiling: + Same as the `profiling` parameter for :meth:`local`. + +.. method:: mitogen.parent.Router.local (remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, via=None) + + Construct a context on the local machine as a subprocess of the current + process. The associated stream implementation is + :class:`mitogen.master.Stream`. + + :param str remote_name: + The ``argv[0]`` suffix for the new process. If `remote_name` is + ``test``, the new process ``argv[0]`` will be ``mitogen:test``. + + If unspecified, defaults to ``@:``. + + This variable cannot contain slash characters, as the resulting + ``argv[0]`` must be presented in such a way as to allow Python to + determine its installation prefix. This is required to support + virtualenv. + + :param str|list python_path: + String or list path to the Python interpreter to use for bootstrap. + Defaults to :data:`sys.executable` for local connections, and + ``python`` for remote connections. + + It is possible to pass a list to invoke Python wrapped using + another tool, such as ``["/usr/bin/env", "python"]``. + + :param bool debug: + If :data:`True`, arrange for debug logging (:meth:`enable_debug`) to + be enabled in the new context. Automatically :data:`True` when + :meth:`enable_debug` has been called, but may be used + selectively otherwise. + + :param bool unidirectional: + If :data:`True`, arrange for the child's router to be constructed + with :attr:`unidirectional routing + ` enabled. Automatically + :data:`True` when it was enabled for this router, but may still be + explicitly set to :data:`False`. + + :param float connect_timeout: + Fractional seconds to wait for the subprocess to indicate it is + healthy. Defaults to 30 seconds. + + :param bool profiling: + If :data:`True`, arrange for profiling (:data:`profiling`) to be + enabled in the new context. Automatically :data:`True` when + :data:`profiling` is :data:`True`, but may be used selectively + otherwise. + + :param mitogen.core.Context via: + If not :data:`None`, arrange for construction to occur via RPCs + made to the context `via`, and for :data:`ADD_ROUTE + ` messages to be generated as appropriate. + + .. code-block:: python + + # SSH to the remote machine. + remote_machine = router.ssh(hostname='mybox.com') + + # Use the SSH connection to create a sudo connection. + remote_root = router.sudo(username='root', via=remote_machine) + +.. method:: mitogen.parent.Router.doas (username=None, password=None, doas_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``doas`` invocation. + The ``doas`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to use, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str doas_path: + Filename or complete path to the ``doas`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``doas``. + :param bytes password_prompt: + A string that indicates ``doas`` is requesting a password. Defaults + to ``Password:``. + :param list incorrect_prompts: + List of bytestrings indicating the password is incorrect. Defaults + to `(b"doas: authentication failed")`. + :raises mitogen.doas.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or the target account did not exist. + +.. method:: mitogen.parent.Router.docker (container=None, image=None, docker_path=None, \**kwargs) + + Construct a context on the local machine within an existing or + temporary new Docker container using the ``docker`` program. One of + `container` or `image` must be specified. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which Docker interprets as ``root``. + :param str image: + Image tag to use to construct a temporary container. Defaults to + :data:`None`. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + +.. method:: mitogen.parent.Router.jail (container, jexec_path=None, \**kwargs) + + Construct a context on the local machine within a FreeBSD jail using + the ``jexec`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + :data:`None`, which ``jexec`` interprets as ``root``. + :param str jexec_path: + Filename or complete path to the ``jexec`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``/usr/sbin/jexec``. + +.. method:: mitogen.parent.Router.kubectl (pod, kubectl_path=None, kubectl_args=None, \**kwargs) + + Construct a context in a container via the Kubernetes ``kubectl`` + program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str pod: + Kubernetes pod to connect to. + :param str kubectl_path: + Filename or complete path to the ``kubectl`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``kubectl``. + :param list kubectl_args: + Additional arguments to pass to the ``kubectl`` command. + +.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within an LXC classic + container using the ``lxc-attach`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_attach_path: + Filename or complete path to the ``lxc-attach`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``lxc-attach``. + +.. method:: mitogen.parent.Router.lxc (container, lxc_attach_path=None, \**kwargs) + + Construct a context on the local machine within a LXD container using + the ``lxc`` program. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str container: + Existing container to connect to. Defaults to :data:`None`. + :param str lxc_path: + Filename or complete path to the ``lxc`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``lxc``. + +.. method:: mitogen.parent.Router.setns (container, kind, username=None, docker_path=None, lxc_info_path=None, machinectl_path=None, \**kwargs) + + Construct a context in the style of :meth:`local`, but change the + active Linux process namespaces via calls to `setns(1)` before + executing Python. + + The namespaces to use, and the active root file system are taken from + the root PID of a running Docker, LXC, LXD, or systemd-nspawn + container. + + A program is required only to find the root PID, after which management + of the child Python interpreter is handled directly. + + :param str container: + Container to connect to. + :param str kind: + One of ``docker``, ``lxc``, ``lxd`` or ``machinectl``. + :param str username: + Username within the container to :func:`setuid` to. Defaults to + ``root``. + :param str docker_path: + Filename or complete path to the Docker binary. ``PATH`` will be + searched if given as a filename. Defaults to ``docker``. + :param str lxc_path: + Filename or complete path to the LXD ``lxc`` binary. ``PATH`` will + be searched if given as a filename. Defaults to ``lxc``. + :param str lxc_info_path: + Filename or complete path to the LXC ``lxc-info`` binary. ``PATH`` + will be searched if given as a filename. Defaults to ``lxc-info``. + :param str machinectl_path: + Filename or complete path to the ``machinectl`` binary. ``PATH`` + will be searched if given as a filename. Defaults to + ``machinectl``. + +.. method:: mitogen.parent.Router.su (username=None, password=None, su_path=None, password_prompt=None, incorrect_prompts=None, \**kwargs) + + Construct a context on the local machine over a ``su`` invocation. The + ``su`` process is started in a newly allocated pseudo-terminal, and + supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to ``su``, defaults to ``root``. + :param str password: + The account password to use if requested. + :param str su_path: + Filename or complete path to the ``su`` binary. ``PATH`` will be + searched if given as a filename. Defaults to ``su``. + :param bytes password_prompt: + The string that indicates ``su`` is requesting a password. Defaults + to ``Password:``. + :param str incorrect_prompts: + Strings that signal the password is incorrect. Defaults to `("su: + sorry", "su: authentication failure")`. + + :raises mitogen.su.PasswordError: + A password was requested but none was provided, the supplied + password was incorrect, or (on BSD) the target account did not + exist. + +.. method:: mitogen.parent.Router.sudo (username=None, sudo_path=None, password=None, \**kwargs) + + Construct a context on the local machine over a ``sudo`` invocation. + The ``sudo`` process is started in a newly allocated pseudo-terminal, + and supports typing interactive passwords. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + Username to pass to sudo as the ``-u`` parameter, defaults to + ``root``. + :param str sudo_path: + Filename or complete path to the sudo binary. ``PATH`` will be + searched if given as a filename. Defaults to ``sudo``. + :param str password: + The password to use if/when sudo requests it. Depending on the sudo + configuration, this is either the current account password or the + target account password. :class:`mitogen.sudo.PasswordError` + will be raised if sudo requests a password but none is provided. + :param bool set_home: + If :data:`True`, request ``sudo`` set the ``HOME`` environment + variable to match the target UNIX account. + :param bool preserve_env: + If :data:`True`, request ``sudo`` to preserve the environment of + the parent process. + :param str selinux_type: + If not :data:`None`, the SELinux security context to use. + :param str selinux_role: + If not :data:`None`, the SELinux role to use. + :param list sudo_args: + Arguments in the style of :data:`sys.argv` that would normally + be passed to ``sudo``. The arguments are parsed in-process to set + equivalent parameters. Re-parsing ensures unsupported options cause + :class:`mitogen.core.StreamError` to be raised, and that + attributes of the stream match the actual behaviour of ``sudo``. + +.. method:: mitogen.parent.Router.ssh (hostname, username=None, ssh_path=None, ssh_args=None, port=None, check_host_keys='enforce', password=None, identity_file=None, identities_only=True, compression=True, \**kwargs) + + Construct a remote context over an OpenSSH ``ssh`` invocation. + + The ``ssh`` process is started in a newly allocated pseudo-terminal to + support typing interactive passwords and responding to prompts, if a + password is specified, or `check_host_keys=accept`. In other scenarios, + ``BatchMode`` is enabled and no PTY is allocated. For many-target + configurations, both options should be avoided as most systems have a + conservative limit on the number of pseudo-terminals that may exist. + + Accepts all parameters accepted by :meth:`local`, in addition to: + + :param str username: + The SSH username; default is unspecified, which causes SSH to pick + the username to use. + :param str ssh_path: + Absolute or relative path to ``ssh``. Defaults to ``ssh``. + :param list ssh_args: + Additional arguments to pass to the SSH command. + :param int port: + Port number to connect to; default is unspecified, which causes SSH + to pick the port number. + :param str check_host_keys: + Specifies the SSH host key checking mode. Defaults to ``enforce``. + + * ``ignore``: no host key checking is performed. Connections never + fail due to an unknown or changed host key. + * ``accept``: known hosts keys are checked to ensure they match, + new host keys are automatically accepted and verified in future + connections. + * ``enforce``: known host keys are checked to ensure they match, + unknown hosts cause a connection failure. + :param str password: + Password to type if/when ``ssh`` requests it. If not specified and + a password is requested, :class:`mitogen.ssh.PasswordError` is + raised. + :param str identity_file: + Path to an SSH private key file to use for authentication. Default + is unspecified, which causes SSH to pick the identity file. + + When this option is specified, only `identity_file` will be used by + the SSH client to perform authenticaion; agent authentication is + automatically disabled, as is reading the default private key from + ``~/.ssh/id_rsa``, or ``~/.ssh/id_dsa``. + :param bool identities_only: + If :data:`True` and a password or explicit identity file is + specified, instruct the SSH client to disable any authentication + identities inherited from the surrounding environment, such as + those loaded in any running ``ssh-agent``, or default key files + present in ``~/.ssh``. This ensures authentication attempts only + occur using the supplied password or SSH key. + :param bool compression: + If :data:`True`, enable ``ssh`` compression support. Compression + has a minimal effect on the size of modules transmitted, as they + are already compressed, however it has a large effect on every + remaining message in the otherwise uncompressed stream protocol, + such as function call arguments and return values. + :param int ssh_debug_level: + Optional integer `0..3` indicating the SSH client debug level. + :raises mitogen.ssh.PasswordError: + A password was requested but none was specified, or the specified + password was incorrect. + + :raises mitogen.ssh.HostKeyError: + When `check_host_keys` is set to either ``accept``, indicates a + previously recorded key no longer matches the remote machine. When + set to ``enforce``, as above, but additionally indicates no + previously recorded key exists for the remote machine. Context Class @@ -619,126 +581,22 @@ Channel Class ============= .. currentmodule:: mitogen.core +.. autoclass:: Channel + :members: -.. class:: Channel (router, context, dst_handle, handle=None) - - A channel inherits from :class:`mitogen.core.Sender` and - `mitogen.core.Receiver` to provide bidirectional functionality. - - Since all handles aren't known until after both ends are constructed, for - both ends to communicate through a channel, it is necessary for one end to - retrieve the handle allocated to the other and reconfigure its own channel - to match. Currently this is a manual task. Broker Class ============ .. currentmodule:: mitogen.core -.. class:: Broker - - Responsible for handling I/O multiplexing in a private thread. - - **Note:** This is the somewhat limited core version of the Broker class - used by child contexts. The master subclass is documented below. - - .. attribute:: shutdown_timeout = 3.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. - - .. method:: defer (func, \*args, \*kwargs) - - Arrange for `func(\*args, \**kwargs)` to be executed on the broker - thread, or immediately if the current thread is the broker thread. Safe - to call from any thread. - - .. method:: defer_sync (func) - - Arrange for `func()` to execute on the broker thread, blocking the - current thread until a result or exception is available. - - :returns: - Call result. - - .. method:: start_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - ready for reading. Safe to call from any thread. When the associated - file descriptor becomes ready for reading, - :meth:`BasicStream.on_receive` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`receive_side ` on `stream` as - not ready for reading. Safe to call from any thread. - - .. method:: _start_transmit (stream) - - Mark the :attr:`transmit_side ` on `stream` as - ready for writing. Must only be called from the Broker thread. When the - associated file descriptor becomes ready for writing, - :meth:`BasicStream.on_transmit` will be called. - - .. method:: stop_receive (stream) - - Mark the :attr:`transmit_side ` on `stream` as - not ready for writing. Safe to call from any thread. - - .. method:: shutdown - - Request broker gracefully disconnect streams and stop. - - .. method:: join - - Wait for the broker to stop, expected to be called after - :meth:`shutdown`. - - .. method:: keep_alive - - Return :data:`True` if any reader's :attr:`Side.keep_alive` - attribute is :data:`True`, or any - :class:`Context ` is still - registered that is not the master. Used to delay shutdown while some - important work is in progress (e.g. log draining). - - **Internal Methods** - - .. method:: _broker_main - - Handle events until :meth:`shutdown`. On shutdown, invoke - :meth:`Stream.on_shutdown` for every active stream, then allow up to - :attr:`shutdown_timeout` seconds for the streams to unregister - themselves before forcefully calling - :meth:`Stream.on_disconnect`. +.. autoclass:: Broker + :members: .. currentmodule:: mitogen.master -.. class:: Broker (install_watcher=True) - - .. note:: - - You may construct as many brokers as desired, and use the same broker - for multiple routers, however usually only one broker need exist. - Multiple brokers may be useful when dealing with sets of children with - differing lifetimes. For example, a subscription service where - non-payment results in termination for one customer. - - :param bool install_watcher: - If :data:`True`, an additional thread is started to monitor the - lifetime of the main thread, triggering :meth:`shutdown` - automatically in case the user forgets to call it, or their code - crashed. - - You should not rely on this functionality in your program, it is only - intended as a fail-safe and to simplify the API for new users. In - particular, alternative Python implementations may not be able to - support watching the main thread. - - .. attribute:: shutdown_timeout = 5.0 - - Seconds grace to allow :class:`streams ` to shutdown - gracefully before force-disconnecting them during :meth:`shutdown`. +.. autoclass:: Broker + :members: Utility Functions diff --git a/docs/changelog.rst b/docs/changelog.rst index a7109a8d..6c095dfb 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -126,7 +126,7 @@ Core Library v0.2.4 (2018-??-??) ------------------- +------------------- Mitogen for Ansible ~~~~~~~~~~~~~~~~~~~ diff --git a/docs/internals.rst b/docs/internals.rst index 9c533952..fc9d57ac 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -35,162 +35,33 @@ Side Class ========== .. currentmodule:: mitogen.core - -.. class:: Side (stream, fd, keep_alive=True) - - Represent a single side of a :py:class:`BasicStream`. This exists to allow - streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional - (e.g. UNIX socket) file descriptors to operate identically. - - :param mitogen.core.Stream stream: - The stream this side is associated with. - - :param int fd: - Underlying file descriptor. - - :param bool keep_alive: - Value for :py:attr:`keep_alive` - - During construction, the file descriptor has its :py:data:`os.O_NONBLOCK` - flag enabled using :py:func:`fcntl.fcntl`. - - .. attribute:: stream - - The :py:class:`Stream` for which this is a read or write side. - - .. attribute:: fd - - Integer file descriptor to perform IO on, or :data:`None` if - :py:meth:`close` has been called. - - .. attribute:: keep_alive - - If :data:`True`, causes presence of this side in :py:class:`Broker`'s - active reader set to defer shutdown until the side is disconnected. - - .. method:: fileno - - Return :py:attr:`fd` if it is not :data:`None`, otherwise raise - :py:class:`StreamError`. This method is implemented so that - :py:class:`Side` can be used directly by :py:func:`select.select`. - - .. method:: close - - Call :py:func:`os.close` on :py:attr:`fd` if it is not :data:`None`, - then set it to :data:`None`. - - .. method:: read (n=CHUNK_SIZE) - - Read up to `n` bytes from the file descriptor, wrapping the underlying - :py:func:`os.read` call with :py:func:`io_op` to trap common - disconnection conditions. - - :py:meth:`read` always behaves as if it is reading from a regular UNIX - file; socket, pipe, and TTY disconnection errors are masked and result - in a 0-sized read just like a regular file. - - :returns: - Bytes read, or the empty to string to indicate disconnection was - detected. - - .. method:: write (s) - - Write as much of the bytes from `s` as possible to the file descriptor, - wrapping the underlying :py:func:`os.write` call with :py:func:`io_op` - to trap common disconnection connditions. - - :returns: - Number of bytes written, or :data:`None` if disconnection was - detected. +.. autoclass:: Side + :members: Stream Classes ============== .. currentmodule:: mitogen.core - -.. class:: BasicStream - - .. attribute:: receive_side - - A :py:class:`Side` representing the stream's receive file descriptor. - - .. attribute:: transmit_side - - A :py:class:`Side` representing the stream's transmit file descriptor. - - .. method:: on_disconnect (broker) - - Called by :py:class:`Broker` to force disconnect the stream. The base - implementation simply closes :py:attr:`receive_side` and - :py:attr:`transmit_side` and unregisters the stream from the broker. - - .. method:: on_receive (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has - been marked readable using :py:meth:`Broker.start_receive` and the - broker has detected the associated file descriptor is ready for - reading. - - Subclasses must implement this method if - :py:meth:`Broker.start_receive` is ever called on them, and the method - must call :py:meth:`on_disconect` if reading produces an empty string. - - .. method:: on_transmit (broker) - - Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` - has been marked writeable using :py:meth:`Broker._start_transmit` and - the broker has detected the associated file descriptor is ready for - writing. - - Subclasses must implement this method if - :py:meth:`Broker._start_transmit` is ever called on them. - - .. method:: on_shutdown (broker) - - Called by :py:meth:`Broker.shutdown` to allow the stream time to - gracefully shutdown. The base implementation simply called - :py:meth:`on_disconnect`. +.. autoclass:: BasicStream + :members: .. autoclass:: Stream :members: - .. method:: pending_bytes () - - Returns the number of bytes queued for transmission on this stream. - This can be used to limit the amount of data buffered in RAM by an - otherwise unlimited consumer. - - For an accurate result, this method should be called from the Broker - thread, using a wrapper like: - - :: - - def get_pending_bytes(self, stream): - latch = mitogen.core.Latch() - self.broker.defer( - lambda: latch.put(stream.pending_bytes()) - ) - return latch.get() - - .. currentmodule:: mitogen.fork - .. autoclass:: Stream :members: .. currentmodule:: mitogen.parent - .. autoclass:: Stream :members: .. currentmodule:: mitogen.ssh - .. autoclass:: Stream :members: .. currentmodule:: mitogen.sudo - .. autoclass:: Stream :members: @@ -212,6 +83,7 @@ Poller Class .. currentmodule:: mitogen.core .. autoclass:: Poller + :members: .. currentmodule:: mitogen.parent .. autoclass:: KqueuePoller @@ -256,64 +128,16 @@ ExternalContext Class ===================== .. currentmodule:: mitogen.core +.. autoclass:: ExternalContext + :members: -.. class:: ExternalContext - - External context implementation. - - .. attribute:: broker - - The :py:class:`mitogen.core.Broker` instance. - - .. attribute:: context - - The :py:class:`mitogen.core.Context` instance. - - .. attribute:: channel - - The :py:class:`mitogen.core.Channel` over which - :py:data:`CALL_FUNCTION` requests are received. - - .. attribute:: stdout_log - - The :py:class:`mitogen.core.IoLogger` connected to ``stdout``. - - .. attribute:: importer - - The :py:class:`mitogen.core.Importer` instance. - - .. attribute:: stdout_log - - The :py:class:`IoLogger` connected to ``stdout``. - - .. attribute:: stderr_log - - The :py:class:`IoLogger` connected to ``stderr``. - - .. method:: _dispatch_calls - - Implementation for the main thread in every child context. mitogen.master ============== -.. currentmodule:: mitogen.master - -.. class:: ProcessMonitor - - Install a :py:data:`signal.SIGCHLD` handler that generates callbacks when a - specific child process has exitted. - - .. method:: add (pid, callback) - - Add a callback function to be notified of the exit status of a process. - - :param int pid: - Process ID to be notified of. - - :param callback: - Function invoked as `callback(status)`, where `status` is the raw - exit status of the child process. +.. currentmodule:: mitogen.parent +.. autoclass:: ProcessMonitor + :members: Blocking I/O Functions diff --git a/mitogen/core.py b/mitogen/core.py index 6cb311d3..2b1771a5 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -100,7 +100,7 @@ CALL_SERVICE = 110 #: * a remote receiver is disconnected or explicitly closed. #: * a related message could not be delivered due to no route existing for it. #: * a router is being torn down, as a sentinel value to notify -#: :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. +#: :meth:`mitogen.core.Router.add_handler` callbacks to clean up. IS_DEAD = 999 try: @@ -187,7 +187,7 @@ class Error(Exception): class LatchError(Error): - """Raised when an attempt is made to use a :py:class:`mitogen.core.Latch` + """Raised when an attempt is made to use a :class:`mitogen.core.Latch` that has been marked closed.""" pass @@ -239,7 +239,7 @@ class Kwargs(dict): class CallError(Error): """Serializable :class:`Error` subclass raised when - :py:meth:`Context.call() ` fails. A copy of + :meth:`Context.call() ` fails. A copy of the traceback from the external context is appended to the exception message.""" def __init__(self, fmt=None, *args): @@ -872,6 +872,15 @@ class Receiver(object): class Channel(Sender, Receiver): + """ + A channel inherits from :class:`mitogen.core.Sender` and + `mitogen.core.Receiver` to provide bidirectional functionality. + + Since all handles aren't known until after both ends are constructed, for + both ends to communicate through a channel, it is necessary for one end to + retrieve the handle allocated to the other and reconfigure its own channel + to match. Currently this is a manual task. + """ def __init__(self, router, context, dst_handle, handle=None): Sender.__init__(self, context, dst_handle) Receiver.__init__(self, router, handle) @@ -1160,12 +1169,35 @@ class LogHandler(logging.Handler): class Side(object): + """ + Represent a single side of a :class:`BasicStream`. This exists to allow + streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional + (e.g. UNIX socket) file descriptors to operate identically. + + :param mitogen.core.Stream stream: + The stream this side is associated with. + + :param int fd: + Underlying file descriptor. + + :param bool keep_alive: + Value for :attr:`keep_alive` + + During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag + enabled using :func:`fcntl.fcntl`. + """ _fork_refs = weakref.WeakValueDictionary() def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): + #: The :class:`Stream` for which this is a read or write side. self.stream = stream + #: Integer file descriptor to perform IO on, or :data:`None` if + #: :meth:`close` has been called. self.fd = fd self.closed = False + #: If :data:`True`, causes presence of this side in + #: :class:`Broker`'s active reader set to defer shutdown until the + #: side is disconnected. self.keep_alive = keep_alive self._fork_refs[id(self)] = self if cloexec: @@ -1182,12 +1214,29 @@ class Side(object): side.close() def close(self): + """ + Call :func:`os.close` on :attr:`fd` if it is not :data:`None`, + then set it to :data:`None`. + """ if not self.closed: _vv and IOLOG.debug('%r.close()', self) self.closed = True os.close(self.fd) def read(self, n=CHUNK_SIZE): + """ + Read up to `n` bytes from the file descriptor, wrapping the underlying + :func:`os.read` call with :func:`io_op` to trap common disconnection + conditions. + + :meth:`read` always behaves as if it is reading from a regular UNIX + file; socket, pipe, and TTY disconnection errors are masked and result + in a 0-sized read like a regular file. + + :returns: + Bytes read, or the empty to string to indicate disconnection was + detected. + """ if self.closed: # Refuse to touch the handle after closed, it may have been reused # by another thread. TODO: synchronize read()/write()/close(). @@ -1198,6 +1247,15 @@ class Side(object): return s def write(self, s): + """ + Write as much of the bytes from `s` as possible to the file descriptor, + wrapping the underlying :func:`os.write` call with :func:`io_op` to + trap common disconnection connditions. + + :returns: + Number of bytes written, or :data:`None` if disconnection was + detected. + """ if self.closed or self.fd is None: # Refuse to touch the handle after closed, it may have been reused # by another thread. @@ -1210,10 +1268,50 @@ class Side(object): class BasicStream(object): + #: A :class:`Side` representing the stream's receive file descriptor. receive_side = None + + #: A :class:`Side` representing the stream's transmit file descriptor. transmit_side = None + def on_receive(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`receive_side` has + been marked readable using :meth:`Broker.start_receive` and the broker + has detected the associated file descriptor is ready for reading. + + Subclasses must implement this if :meth:`Broker.start_receive` is ever + called on them, and the method must call :meth:`on_disconect` if + reading produces an empty string. + """ + + def on_transmit(self, broker): + """ + Called by :class:`Broker` when the stream's :attr:`transmit_side` + has been marked writeable using :meth:`Broker._start_transmit` and + the broker has detected the associated file descriptor is ready for + writing. + + Subclasses must implement this if :meth:`Broker._start_transmit` is + ever called on them. + """ + + def on_shutdown(self, broker): + """ + Called by :meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. The base implementation simply called + :meth:`on_disconnect`. + """ + _v and LOG.debug('%r.on_shutdown()', self) + fire(self, 'shutdown') + self.on_disconnect(broker) + def on_disconnect(self, broker): + """ + Called by :class:`Broker` to force disconnect the stream. The base + implementation simply closes :attr:`receive_side` and + :attr:`transmit_side` and unregisters the stream from the broker. + """ LOG.debug('%r.on_disconnect()', self) if self.receive_side: broker.stop_receive(self) @@ -1223,19 +1321,14 @@ class BasicStream(object): self.transmit_side.close() fire(self, 'disconnect') - def on_shutdown(self, broker): - _v and LOG.debug('%r.on_shutdown()', self) - fire(self, 'shutdown') - self.on_disconnect(broker) - class Stream(BasicStream): """ - :py:class:`BasicStream` subclass implementing mitogen's :ref:`stream + :class:`BasicStream` subclass implementing mitogen's :ref:`stream protocol `. """ - #: If not :data:`None`, :py:class:`Router` stamps this into - #: :py:attr:`Message.auth_id` of every message received on this stream. + #: If not :data:`None`, :class:`Router` stamps this into + #: :attr:`Message.auth_id` of every message received on this stream. auth_id = None #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and @@ -1272,7 +1365,7 @@ class Stream(BasicStream): def on_receive(self, broker): """Handle the next complete message on the stream. Raise - :py:class:`StreamError` on failure.""" + :class:`StreamError` on failure.""" _vv and IOLOG.debug('%r.on_receive()', self) buf = self.receive_side.read() @@ -1329,6 +1422,14 @@ class Stream(BasicStream): return True def pending_bytes(self): + """ + Return the number of bytes queued for transmission on this stream. This + can be used to limit the amount of data buffered in RAM by an otherwise + unlimited consumer. + + For an accurate result, this method should be called from the Broker + thread, for example by using :meth:`Broker.defer_sync`. + """ return self._output_buf_len def on_transmit(self, broker): @@ -1572,15 +1673,15 @@ class Poller(object): class Latch(object): """ - A latch is a :py:class:`Queue.Queue`-like object that supports mutation and - waiting from multiple threads, however unlike :py:class:`Queue.Queue`, + A latch is a :class:`Queue.Queue`-like object that supports mutation and + waiting from multiple threads, however unlike :class:`Queue.Queue`, waiting threads always remain interruptible, so CTRL+C always succeeds, and waits where a timeout is set experience no wake up latency. These properties are not possible in combination using the built-in threading primitives available in Python 2.x. Latches implement queues using the UNIX self-pipe trick, and a per-thread - :py:func:`socket.socketpair` that is lazily created the first time any + :func:`socket.socketpair` that is lazily created the first time any latch attempts to sleep on a thread, and dynamically associated with the waiting Latch only for duration of the wait. @@ -1626,7 +1727,7 @@ class Latch(object): def close(self): """ Mark the latch as closed, and cause every sleeping thread to be woken, - with :py:class:`mitogen.core.LatchError` raised in each thread. + with :class:`mitogen.core.LatchError` raised in each thread. """ self._lock.acquire() try: @@ -1640,17 +1741,17 @@ class Latch(object): def empty(self): """ - Return :py:data:`True` if calling :py:meth:`get` would block. + Return :data:`True` if calling :meth:`get` would block. - As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even - though a subsequent call to :py:meth:`get` will succeed, since a - message may be posted at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`True` may be returned even + though a subsequent call to :meth:`get` will succeed, since a + message may be posted at any moment between :meth:`empty` and + :meth:`get`. - As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even - though a subsequent call to :py:meth:`get` will block, since another - waiting thread may be woken at any moment between :py:meth:`empty` and - :py:meth:`get`. + As with :class:`Queue.Queue`, :data:`False` may be returned even + though a subsequent call to :meth:`get` will block, since another + waiting thread may be woken at any moment between :meth:`empty` and + :meth:`get`. """ return len(self._queue) == 0 @@ -1683,14 +1784,14 @@ class Latch(object): Return the next enqueued object, or sleep waiting for one. :param float timeout: - If not :py:data:`None`, specifies a timeout in seconds. + If not :data:`None`, specifies a timeout in seconds. :param bool block: - If :py:data:`False`, immediately raise - :py:class:`mitogen.core.TimeoutError` if the latch is empty. + If :data:`False`, immediately raise + :class:`mitogen.core.TimeoutError` if the latch is empty. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. :raises mitogen.core.TimeoutError: Timeout was reached. @@ -1771,7 +1872,7 @@ class Latch(object): exists. :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the object is no longer valid. + :meth:`close` has been called, and the object is no longer valid. """ _vv and IOLOG.debug('%r.put(%r)', self, obj) self._lock.acquire() @@ -1807,7 +1908,7 @@ class Latch(object): class Waker(BasicStream): """ - :py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. + :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. Used to wake the multiplexer when another thread needs to modify its state (via a cross-thread function call). @@ -1893,8 +1994,8 @@ class Waker(BasicStream): class IoLogger(BasicStream): """ - :py:class:`BasicStream` subclass that sets up redirection of a standard - UNIX file descriptor back into the Python :py:mod:`logging` package. + :class:`BasicStream` subclass that sets up redirection of a standard + UNIX file descriptor back into the Python :mod:`logging` package. """ _buf = '' @@ -2126,8 +2227,8 @@ class Router(object): return handle def on_shutdown(self, broker): - """Called during :py:meth:`Broker.shutdown`, informs callbacks - registered with :py:meth:`add_handle_cb` the connection is dead.""" + """Called during :meth:`Broker.shutdown`, informs callbacks registered + with :meth:`add_handle_cb` the connection is dead.""" _v and LOG.debug('%r.on_shutdown(%r)', self, broker) fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): @@ -2249,14 +2350,26 @@ class Router(object): class Broker(object): + """ + Responsible for handling I/O multiplexing in a private thread. + + **Note:** This is the somewhat limited core version of the Broker class + used by child contexts. The master subclass is documented below. + """ poller_class = Poller _waker = None _thread = None + + #: Seconds grace to allow :class:`streams ` to shutdown gracefully + #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 def __init__(self, poller_class=None): self._alive = True self._waker = Waker(self) + #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker + #: thread, or immediately if the current thread is the broker thread. + #: Safe to call from any thread. self.defer = self._waker.defer self.poller = self.poller_class() self.poller.start_receive( @@ -2272,6 +2385,12 @@ class Broker(object): self._waker.broker_ident = self._thread.ident def start_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as + ready for reading. Safe to call from any thread. When the associated + file descriptor becomes ready for reading, + :meth:`BasicStream.on_receive` will be called. + """ _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) side = stream.receive_side assert side and side.fd is not None @@ -2279,26 +2398,47 @@ class Broker(object): side.fd, (side, stream.on_receive)) def stop_receive(self, stream): + """ + Mark the :attr:`receive_side ` on `stream` as not + ready for reading. Safe to call from any thread. + """ _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) self.defer(self.poller.stop_receive, stream.receive_side.fd) def _start_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as + ready for writing. Must only be called from the Broker thread. When the + associated file descriptor becomes ready for writing, + :meth:`BasicStream.on_transmit` will be called. + """ _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) side = stream.transmit_side assert side and side.fd is not None self.poller.start_transmit(side.fd, (side, stream.on_transmit)) def _stop_transmit(self, stream): + """ + Mark the :attr:`transmit_side ` on `stream` as not + ready for writing. + """ _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) self.poller.stop_transmit(stream.transmit_side.fd) def keep_alive(self): + """ + Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute + is :data:`True`, or any :class:`Context` is still registered that is + not the master. Used to delay shutdown while some important work is in + progress (e.g. log draining). + """ it = (side.keep_alive for (_, (side, _)) in self.poller.readers) return sum(it, 0) def defer_sync(self, func): """ - Block the calling thread while `func` runs on a broker thread. + Arrange for `func()` to execute on the broker thread, blocking the + current thread until a result or exception is available. :returns: Return value of `func()`. @@ -2349,6 +2489,12 @@ class Broker(object): side.stream.on_disconnect(self) def _broker_main(self): + """ + Handle events until :meth:`shutdown`. On shutdown, invoke + :meth:`Stream.on_shutdown` for every active stream, then allow up to + :attr:`shutdown_timeout` seconds for the streams to unregister + themselves before forcefully calling :meth:`Stream.on_disconnect`. + """ try: while self._alive: self._loop_once() @@ -2361,12 +2507,20 @@ class Broker(object): fire(self, 'exit') def shutdown(self): + """ + Request broker gracefully disconnect streams and stop. Safe to call + from any thread. + """ _v and LOG.debug('%r.shutdown()', self) def _shutdown(): self._alive = False self.defer(_shutdown) def join(self): + """ + Wait for the broker to stop, expected to be called after + :meth:`shutdown`. + """ self._thread.join() def __repr__(self): @@ -2438,6 +2592,34 @@ class Dispatcher(object): class ExternalContext(object): + """ + External context implementation. + + .. attribute:: broker + The :class:`mitogen.core.Broker` instance. + + .. attribute:: context + The :class:`mitogen.core.Context` instance. + + .. attribute:: channel + The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION` + requests are received. + + .. attribute:: stdout_log + The :class:`mitogen.core.IoLogger` connected to ``stdout``. + + .. attribute:: importer + The :class:`mitogen.core.Importer` instance. + + .. attribute:: stdout_log + The :class:`IoLogger` connected to ``stdout``. + + .. attribute:: stderr_log + The :class:`IoLogger` connected to ``stderr``. + + .. method:: _dispatch_calls + Implementation for the main thread in every child context. + """ detached = False def __init__(self, config): diff --git a/mitogen/master.py b/mitogen/master.py index 73302910..4b1b164d 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -754,6 +754,26 @@ class ModuleResponder(object): class Broker(mitogen.core.Broker): + """ + .. note:: + + You may construct as many brokers as desired, and use the same broker + for multiple routers, however usually only one broker need exist. + Multiple brokers may be useful when dealing with sets of children with + differing lifetimes. For example, a subscription service where + non-payment results in termination for one customer. + + :param bool install_watcher: + If :data:`True`, an additional thread is started to monitor the + lifetime of the main thread, triggering :meth:`shutdown` + automatically in case the user forgets to call it, or their code + crashed. + + You should not rely on this functionality in your program, it is only + intended as a fail-safe and to simplify the API for new users. In + particular, alternative Python implementations may not be able to + support watching the main thread. + """ shutdown_timeout = 5.0 _watcher = None poller_class = mitogen.parent.PREFERRED_POLLER @@ -773,7 +793,32 @@ class Broker(mitogen.core.Broker): class Router(mitogen.parent.Router): + """ + Extend :class:`mitogen.core.Router` with functionality useful to masters, + and child contexts who later become masters. Currently when this class is + required, the target context's router is upgraded at runtime. + + .. note:: + + You may construct as many routers as desired, and use the same broker + for multiple routers, however usually only one broker and router need + exist. Multiple routers may be useful when dealing with separate trust + domains, for example, manipulating infrastructure belonging to separate + customers or projects. + + :param mitogen.master.Broker broker: + Broker to use. If not specified, a private :class:`Broker` is created. + """ broker_class = Broker + + #: When :data:`True`, cause the broker thread and any subsequent broker and + #: main threads existing in any child to write + #: ``/tmp/mitogen.stats...log`` containing a + #: :mod:`cProfile` dump on graceful exit. Must be set prior to construction + #: of any :class:`Broker`, e.g. via: + #: + #: .. code:: + #: mitogen.master.Router.profiling = True profiling = False def __init__(self, broker=None, max_message_size=None): @@ -796,6 +841,10 @@ class Router(mitogen.parent.Router): ) def enable_debug(self): + """ + Cause this context and any descendant child contexts to write debug + logs to ``/tmp/mitogen..log``. + """ mitogen.core.enable_debug_logging() self.debug = True @@ -830,6 +879,12 @@ class IdAllocator(object): BLOCK_SIZE = 1000 def allocate(self): + """ + Arrange for a unique context ID to be allocated and associated with a + route leading to the active context. In masters, the ID is generated + directly, in children it is forwarded to the master via a + :data:`mitogen.core.ALLOCATE_ID` message. + """ self.lock.acquire() try: id_ = self.next_id diff --git a/mitogen/parent.py b/mitogen/parent.py index 0fffdd67..780cecd7 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1822,6 +1822,10 @@ class Router(mitogen.core.Router): class ProcessMonitor(object): + """ + Install a :data:`signal.SIGCHLD` handler that generates callbacks when a + specific child process has exitted. This class is obsolete, do not use. + """ def __init__(self): # pid -> callback() self.callback_by_pid = {} @@ -1835,6 +1839,16 @@ class ProcessMonitor(object): del self.callback_by_pid[pid] def add(self, pid, callback): + """ + Add a callback function to be notified of the exit status of a process. + + :param int pid: + Process ID to be notified of. + + :param callback: + Function invoked as `callback(status)`, where `status` is the raw + exit status of the child process. + """ self.callback_by_pid[pid] = callback _instance = None