Merge commit '5908936' into release-v0.3.26

pull/1321/head
Alex Willmer 4 months ago
commit fd1d45568a

@ -25,23 +25,17 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
- name: Ans_27_210 - tox_env: py27-m_ans-ans2.10
tox_env: py27-mode_ansible-ansible2.10 - tox_env: py27-m_ans-ans4
- name: Ans_27_4
tox_env: py27-mode_ansible-ansible4
- name: Ans_36_210 - tox_env: py36-m_ans-ans2.10
python_version: '3.6' python_version: '3.6'
tox_env: py36-mode_ansible-ansible2.10 - tox_env: py36-m_ans-ans4
- name: Ans_36_4
python_version: '3.6' python_version: '3.6'
tox_env: py36-mode_ansible-ansible4
- name: Mito_27 - tox_env: py27-m_mtg
tox_env: py27-mode_mitogen - tox_env: py36-m_mtg
- name: Mito_36
python_version: '3.6' python_version: '3.6'
tox_env: py36-mode_mitogen
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -98,7 +92,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then
@ -123,7 +117,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then
@ -147,50 +141,36 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
- name: Ans_311_210 - tox_env: py311-m_ans-ans2.10
python_version: '3.11' python_version: '3.11'
tox_env: py311-mode_ansible-ansible2.10 - tox_env: py311-m_ans-ans3
- name: Ans_311_3
python_version: '3.11' python_version: '3.11'
tox_env: py311-mode_ansible-ansible3 - tox_env: py311-m_ans-ans4
- name: Ans_311_4
python_version: '3.11' python_version: '3.11'
tox_env: py311-mode_ansible-ansible4 - tox_env: py311-m_ans-ans5
- name: Ans_311_5
python_version: '3.11' python_version: '3.11'
tox_env: py311-mode_ansible-ansible5 - tox_env: py313-m_ans-ans6
- name: Ans_313_6
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible6 - tox_env: py313-m_ans-ans7
- name: Ans_313_7
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible7 - tox_env: py313-m_ans-ans8
- name: Ans_313_8
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible8 - tox_env: py313-m_ans-ans9
- name: Ans_313_9
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible9 - tox_env: py313-m_ans-ans10
- name: Ans_313_10
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible10 - tox_env: py313-m_ans-ans11
- name: Ans_313_11
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible11 - tox_env: py313-m_ans-ans12
- name: Ans_313_12
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible12
- name: Van_313_11 - tox_env: py313-m_ans-ans11-s_lin
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible11-strategy_linear - tox_env: py313-m_ans-ans12-s_lin
- name: Van_313_12
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_ansible-ansible12-strategy_linear
- name: Mito_313 - tox_env: py313-m_mtg
python_version: '3.13' python_version: '3.13'
tox_env: py313-mode_mitogen
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -234,7 +214,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then
@ -250,7 +230,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then
@ -270,22 +250,14 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
- name: Mito_313 - tox_env: py313-m_lcl-ans11
tox_env: py313-mode_mitogen
- name: Loc_313_11
sshpass_version: "1.10" sshpass_version: "1.10"
tox_env: py313-mode_localhost-ansible11 - tox_env: py313-m_lcl-ans11-s_lin
- name: Van_313_11
sshpass_version: "1.10" sshpass_version: "1.10"
tox_env: py313-mode_localhost-ansible11-strategy_linear - tox_env: py313-m_lcl-ans12
- tox_env: py313-m_lcl-ans12-s_lin
- name: Loc_313_12
tox_env: py313-mode_localhost-ansible12
- name: Van_313_12 - tox_env: py313-m_mtg
tox_env: py313-mode_localhost-ansible12-strategy_linear
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -325,7 +297,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then
@ -341,7 +313,7 @@ jobs:
run: | run: |
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
# Tox environment name (e.g. py312-mode_mitogen) -> Python executable name (e.g. python3.12) # Tox environment name (e.g. py312-m_mtg) -> Python executable name (e.g. python3.12)
PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))') PYTHON=$(python -c 'import re; print(re.sub(r"^py([23])([0-9]{1,2}).*", r"python\1.\2", "${{ matrix.tox_env }}"))')
if [[ -z $PYTHON ]]; then if [[ -z $PYTHON ]]; then

@ -409,6 +409,7 @@ def _connect_mitogen_doas(spec):
#: generating ContextService keyword arguments matching a connection #: generating ContextService keyword arguments matching a connection
#: specification. #: specification.
CONNECTION_METHOD = { CONNECTION_METHOD = {
# Ansible connection plugins
'buildah': _connect_buildah, 'buildah': _connect_buildah,
'docker': _connect_docker, 'docker': _connect_docker,
'kubectl': _connect_kubectl, 'kubectl': _connect_kubectl,
@ -421,9 +422,14 @@ CONNECTION_METHOD = {
'setns': _connect_setns, 'setns': _connect_setns,
'ssh': _connect_ssh, 'ssh': _connect_ssh,
'smart': _connect_ssh, # issue #548. 'smart': _connect_ssh, # issue #548.
# Ansible become plugins
'community.general.doas': _connect_doas,
'su': _connect_su, 'su': _connect_su,
'sudo': _connect_sudo, 'sudo': _connect_sudo,
'doas': _connect_doas, 'doas': _connect_doas,
# Mitogen specific methods
'mitogen_su': _connect_mitogen_su, 'mitogen_su': _connect_mitogen_su,
'mitogen_sudo': _connect_mitogen_sudo, 'mitogen_sudo': _connect_mitogen_sudo,
'mitogen_doas': _connect_mitogen_doas, 'mitogen_doas': _connect_mitogen_doas,

@ -426,7 +426,7 @@ class ClassicWorkerModel(WorkerModel):
common_setup(_init_logging=_init_logging) common_setup(_init_logging=_init_logging)
self.parent_sock, self.child_sock = socket.socketpair() self.parent_sock, self.child_sock = mitogen.core.socketpair()
mitogen.core.set_cloexec(self.parent_sock.fileno()) mitogen.core.set_cloexec(self.parent_sock.fileno())
mitogen.core.set_cloexec(self.child_sock.fileno()) mitogen.core.set_cloexec(self.child_sock.fileno())

@ -18,6 +18,15 @@ To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub <https://github.com/mitogen-hq/mitogen/>`_. `directly from GitHub <https://github.com/mitogen-hq/mitogen/>`_.
v0.3.26 (2025-08-04)
--------------------
* :gh:issue:`1318` CI: Abbreviate Github Actions job names
* :gh:issue:`1309` :mod:`ansible_mitogen`: Fix ``become_method: doas``
* :gh:issue:`712` :mod:`mitogen`: Fix :exc:`BlockingIOError` & ``EAGAIN``
errors in subprocesses that write to stdio
v0.3.25 (2025-07-29) v0.3.25 (2025-07-29)
-------------------- --------------------

@ -35,7 +35,7 @@ be expected. On the slave, it is built dynamically during startup.
#: Library version as a tuple. #: Library version as a tuple.
__version__ = (0, 3, 25) __version__ = (0, 3, 26)
#: This is :data:`False` in slave contexts. Previously it was used to prevent #: This is :data:`False` in slave contexts. Previously it was used to prevent

@ -87,6 +87,17 @@ import warnings
import weakref import weakref
import zlib import zlib
if sys.version_info > (3,5):
from os import get_blocking, set_blocking
else:
def get_blocking(fd):
return not fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_NONBLOCK
def set_blocking(fd, blocking):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
if blocking: fcntl.fcntl(fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
else: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try: try:
# Python >= 3.4, PEP 451 ModuleSpec API # Python >= 3.4, PEP 451 ModuleSpec API
import importlib.machinery import importlib.machinery
@ -559,26 +570,6 @@ def set_cloexec(fd):
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def set_nonblock(fd):
"""
Set the file descriptor `fd` to non-blocking mode. For most underlying file
types, this causes :func:`os.read` or :func:`os.write` to raise
:class:`OSError` with :data:`errno.EAGAIN` rather than block the thread
when the underlying kernel buffer is exhausted.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def set_block(fd):
"""
Inverse of :func:`set_nonblock`, i.e. cause `fd` to block the thread when
the underlying kernel buffer is exhausted.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
def io_op(func, *args): def io_op(func, *args):
""" """
Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`, Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`,
@ -720,7 +711,7 @@ def import_module(modname):
return __import__(modname, None, None, ['']) return __import__(modname, None, None, [''])
def pipe(): def pipe(blocking=None):
""" """
Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned
descriptors in Python file objects in order to manage their lifetime and descriptors in Python file objects in order to manage their lifetime and
@ -728,12 +719,22 @@ def pipe():
not been closed explicitly. not been closed explicitly.
""" """
rfd, wfd = os.pipe() rfd, wfd = os.pipe()
for fd in rfd, wfd:
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
return ( return (
os.fdopen(rfd, 'rb', 0), os.fdopen(rfd, 'rb', 0),
os.fdopen(wfd, 'wb', 0) os.fdopen(wfd, 'wb', 0)
) )
def socketpair(blocking=None):
fp1, fp2 = socket.socketpair()
for fp in fp1, fp2:
fd = fp.fileno()
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
return fp1, fp2
def iter_split(buf, delim, func): def iter_split(buf, delim, func):
""" """
Invoke `func(s)` for each `delim`-delimited chunk in the potentially large Invoke `func(s)` for each `delim`-delimited chunk in the potentially large
@ -1879,8 +1880,7 @@ class Stream(object):
""" """
Attach a pair of file objects to :attr:`receive_side` and Attach a pair of file objects to :attr:`receive_side` and
:attr:`transmit_side`, after wrapping them in :class:`Side` instances. :attr:`transmit_side`, after wrapping them in :class:`Side` instances.
:class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec` :class:`Side` will call :func:`set_cloexec` on them.
on the underlying file descriptors during construction.
The same file object may be used for both sides. The default The same file object may be used for both sides. The default
:meth:`on_disconnect` is handles the possibility that only one :meth:`on_disconnect` is handles the possibility that only one
@ -2155,14 +2155,11 @@ class Side(object):
:param bool keep_alive: :param bool keep_alive:
If :data:`True`, the continued existence of this side will extend the If :data:`True`, the continued existence of this side will extend the
shutdown grace period until it has been unregistered from the broker. shutdown grace period until it has been unregistered from the broker.
:param bool blocking:
If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag
enabled using :func:`fcntl.fcntl`.
""" """
_fork_refs = weakref.WeakValueDictionary() _fork_refs = weakref.WeakValueDictionary()
closed = False closed = False
def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False): def __init__(self, stream, fp, cloexec=True, keep_alive=True):
#: The :class:`Stream` for which this is a read or write side. #: The :class:`Stream` for which this is a read or write side.
self.stream = stream self.stream = stream
# File or socket object responsible for the lifetime of its underlying # File or socket object responsible for the lifetime of its underlying
@ -2180,8 +2177,6 @@ class Side(object):
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
set_cloexec(self.fd) set_cloexec(self.fd)
if not blocking:
set_nonblock(self.fd)
def __repr__(self): def __repr__(self):
return '<Side of %s fd %s>' % ( return '<Side of %s fd %s>' % (
@ -2785,7 +2780,7 @@ class Latch(object):
try: try:
return self._cls_idle_socketpairs.pop() # pop() must be atomic return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError: except IndexError:
rsock, wsock = socket.socketpair() rsock, wsock = socketpair()
rsock.setblocking(False) rsock.setblocking(False)
set_cloexec(rsock.fileno()) set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno()) set_cloexec(wsock.fileno())
@ -2958,7 +2953,8 @@ class Waker(Protocol):
@classmethod @classmethod
def build_stream(cls, broker): def build_stream(cls, broker):
stream = super(Waker, cls).build_stream(broker) stream = super(Waker, cls).build_stream(broker)
stream.accept(*pipe()) rfp, wfp = pipe(blocking=False)
stream.accept(rfp, wfp)
return stream return stream
def __init__(self, broker): def __init__(self, broker):
@ -3056,7 +3052,8 @@ class IoLoggerProtocol(DelimitedProtocol):
prevent break :meth:`on_shutdown` from calling :meth:`shutdown() prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
<socket.socket.shutdown>` on it. <socket.socket.shutdown>` on it.
""" """
rsock, wsock = socket.socketpair() # Leave wsock & dest_fd blocking, so the subprocess will have sane stdio
rsock, wsock = socketpair()
os.dup2(wsock.fileno(), dest_fd) os.dup2(wsock.fileno(), dest_fd)
stream = super(IoLoggerProtocol, cls).build_stream(name) stream = super(IoLoggerProtocol, cls).build_stream(name)
stream.name = name stream.name = name
@ -4038,6 +4035,9 @@ class ExternalContext(object):
local_id=self.config['context_id'], local_id=self.config['context_id'],
parent_ids=self.config['parent_ids'] parent_ids=self.config['parent_ids']
) )
for f in in_fp, out_fp:
fd = f.fileno()
set_blocking(fd, False)
self.stream.accept(in_fp, out_fp) self.stream.accept(in_fp, out_fp)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.receive_side.keep_alive = False self.stream.receive_side.keep_alive = False

@ -179,6 +179,9 @@ class Process(object):
self.control_handle = router.add_handler(self._on_control) self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin) self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump.build_stream(router.broker) self.pump = IoPump.build_stream(router.broker)
for fp in stdin, stdout:
fd = fp.fileno()
mitogen.core.set_blocking(fd, False)
self.pump.accept(stdin, stdout) self.pump.accept(stdin, stdout)
self.stdin = None self.stdin = None
self.control = None self.control = None
@ -419,10 +422,11 @@ def run(dest, router, args, deadline=None, econtext=None):
fakessh = mitogen.parent.Context(router, context_id) fakessh = mitogen.parent.Context(router, context_id)
fakessh.name = u'fakessh.%d' % (context_id,) fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = socket.socketpair() sock1, sock2 = mitogen.core.socketpair()
stream = mitogen.core.Stream(router, context_id) stream = mitogen.core.Stream(router, context_id)
stream.name = u'fakessh' stream.name = u'fakessh'
mitogen.core.set_blocking(sock1.fileno(), False)
stream.accept(sock1, sock1) stream.accept(sock1, sock1)
router.register(fakessh, stream) router.register(fakessh, stream)

@ -211,7 +211,7 @@ class Connection(mitogen.parent.Connection):
on_fork() on_fork()
if self.options.on_fork: if self.options.on_fork:
self.options.on_fork() self.options.on_fork()
mitogen.core.set_block(childfp.fileno()) mitogen.core.set_blocking(childfp.fileno(), True)
childfp.send(b('MITO002\n')) childfp.send(b('MITO002\n'))

@ -38,6 +38,7 @@ import sys
import weakref import weakref
import mitogen.core import mitogen.core
import mitogen.parent
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this # List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
@ -131,9 +132,9 @@ class Corker(object):
`obj` to be written to by one of its threads. `obj` to be written to by one of its threads.
""" """
rsock, wsock = mitogen.parent.create_socketpair(size=4096) rsock, wsock = mitogen.parent.create_socketpair(size=4096)
mitogen.core.set_blocking(wsock.fileno(), True) # gevent
mitogen.core.set_cloexec(rsock.fileno()) mitogen.core.set_cloexec(rsock.fileno())
mitogen.core.set_cloexec(wsock.fileno()) mitogen.core.set_cloexec(wsock.fileno())
mitogen.core.set_block(wsock) # gevent
self._rsocks.append(rsock) self._rsocks.append(rsock)
obj.defer(self._do_cork, s, wsock) obj.defer(self._do_cork, s, wsock)

@ -265,7 +265,7 @@ def disable_echo(fd):
termios.tcsetattr(fd, flags, new) termios.tcsetattr(fd, flags, new)
def create_socketpair(size=None): def create_socketpair(size=None, blocking=None):
""" """
Create a :func:`socket.socketpair` for use as a child's UNIX stdio Create a :func:`socket.socketpair` for use as a child's UNIX stdio
channels. As socketpairs are bidirectional, they are economical on file channels. As socketpairs are bidirectional, they are economical on file
@ -276,14 +276,14 @@ def create_socketpair(size=None):
if size is None: if size is None:
size = mitogen.core.CHUNK_SIZE size = mitogen.core.CHUNK_SIZE
parentfp, childfp = socket.socketpair() parentfp, childfp = mitogen.core.socketpair(blocking)
for fp in parentfp, childfp: for fp in parentfp, childfp:
fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size) fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
return parentfp, childfp return parentfp, childfp
def create_best_pipe(escalates_privilege=False): def create_best_pipe(escalates_privilege=False, blocking=None):
""" """
By default we prefer to communicate with children over a UNIX socket, as a By default we prefer to communicate with children over a UNIX socket, as a
single file descriptor can represent bidirectional communication, and a single file descriptor can represent bidirectional communication, and a
@ -301,16 +301,19 @@ def create_best_pipe(escalates_privilege=False):
:param bool escalates_privilege: :param bool escalates_privilege:
If :data:`True`, the target program may escalate privileges, causing If :data:`True`, the target program may escalate privileges, causing
SELinux to disconnect AF_UNIX sockets, so avoid those. SELinux to disconnect AF_UNIX sockets, so avoid those.
:param None|bool blocking:
If :data:`False` or :data:`True`, set non-blocking or blocking mode.
If :data:`None` (default), use default.
:returns: :returns:
`(parent_rfp, child_wfp, child_rfp, parent_wfp)` `(parent_rfp, child_wfp, child_rfp, parent_wfp)`
""" """
if (not escalates_privilege) or (not SELINUX_ENABLED): if (not escalates_privilege) or (not SELINUX_ENABLED):
parentfp, childfp = create_socketpair() parentfp, childfp = create_socketpair(blocking=blocking)
return parentfp, childfp, childfp, parentfp return parentfp, childfp, childfp, parentfp
parent_rfp, child_wfp = mitogen.core.pipe() parent_rfp, child_wfp = mitogen.core.pipe(blocking)
try: try:
child_rfp, parent_wfp = mitogen.core.pipe() child_rfp, parent_wfp = mitogen.core.pipe(blocking)
return parent_rfp, child_wfp, child_rfp, parent_wfp return parent_rfp, child_wfp, child_rfp, parent_wfp
except: except:
parent_rfp.close() parent_rfp.close()
@ -481,7 +484,7 @@ def openpty():
if not IS_SOLARIS: if not IS_SOLARIS:
disable_echo(master_fd) disable_echo(master_fd)
disable_echo(slave_fd) disable_echo(slave_fd)
mitogen.core.set_block(slave_fd) mitogen.core.set_blocking(slave_fd, True)
return master_fp, slave_fp return master_fp, slave_fp
@ -547,8 +550,8 @@ def hybrid_tty_create_child(args, escalates_privilege=False):
escalates_privilege=escalates_privilege, escalates_privilege=escalates_privilege,
) )
try: try:
mitogen.core.set_block(child_rfp) mitogen.core.set_blocking(child_rfp.fileno(), True)
mitogen.core.set_block(child_wfp) mitogen.core.set_blocking(child_wfp.fileno(), True)
proc = popen( proc = popen(
args=args, args=args,
stdin=child_rfp, stdin=child_rfp,
@ -1643,6 +1646,9 @@ class Connection(object):
stream = self.stream_factory() stream = self.stream_factory()
stream.conn = self stream.conn = self
stream.name = self.options.name or self._get_name() stream.name = self.options.name or self._get_name()
for fp in self.proc.stdout, self.proc.stdin:
fd = fp.fileno()
mitogen.core.set_blocking(fd, False)
stream.accept(self.proc.stdout, self.proc.stdin) stream.accept(self.proc.stdout, self.proc.stdin)
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect) mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
@ -1653,6 +1659,8 @@ class Connection(object):
stream = self.stderr_stream_factory() stream = self.stderr_stream_factory()
stream.conn = self stream.conn = self
stream.name = self.options.name or self._get_name() stream.name = self.options.name or self._get_name()
fd = self.proc.stderr.fileno()
mitogen.core.set_blocking(fd, False)
stream.accept(self.proc.stderr, self.proc.stderr) stream.accept(self.proc.stderr, self.proc.stderr)
mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect) mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)
@ -2555,9 +2563,8 @@ class Reaper(object):
relatively conservative retries. relatively conservative retries.
""" """
delay = 0.05 delay = 0.05
for _ in xrange(count): factor = 1.72
delay *= 1.72 return delay * factor ** count
return delay
def _on_broker_shutdown(self): def _on_broker_shutdown(self):
""" """

@ -111,6 +111,7 @@ class Listener(mitogen.core.Protocol):
sock.listen(backlog) sock.listen(backlog)
stream = super(Listener, cls).build_stream(router, path) stream = super(Listener, cls).build_stream(router, path)
mitogen.core.set_blocking(sock.fileno(), False)
stream.accept(sock, sock) stream.accept(sock, sock)
router.broker.start_receive(stream) router.broker.start_receive(stream)
return stream return stream
@ -169,6 +170,7 @@ class Listener(mitogen.core.Protocol):
auth_id=mitogen.context_id, auth_id=mitogen.context_id,
) )
stream.name = u'unix_client.%d' % (pid,) stream.name = u'unix_client.%d' % (pid,)
mitogen.core.set_blocking(sock.fileno(), False)
stream.accept(sock, sock) stream.accept(sock, sock)
LOG.debug('listener: accepted connection from PID %d: %s', LOG.debug('listener: accepted connection from PID %d: %s',
pid, stream.name) pid, stream.name)

@ -1,4 +1,5 @@
- import_playbook: doas.yml
- import_playbook: su_password.yml - import_playbook: su_password.yml
- import_playbook: sudo_flags_failure.yml - import_playbook: sudo_flags_failure.yml
- import_playbook: sudo_nonexistent.yml - import_playbook: sudo_nonexistent.yml

@ -0,0 +1,91 @@
- name: integration/become/doas.yml - unqualified
hosts: test-targets:&linux_containers
gather_facts: false
become_method: doas # noqa: schema[playbook]
vars:
ansible_become_password: has_sudo_nopw_password
tasks:
# Vanilla Ansible doas requires pipelining=false
# https://github.com/ansible-collections/community.general/issues/9977
- include_tasks: ../_mitogen_only.yml
- name: Test doas -> default target user
become: true
command: whoami
changed_when: false
check_mode: false
register: doas_default_user
- assert:
that:
- doas_default_user.stdout == 'root'
fail_msg:
doas_default_user={{ doas_default_user }}
- name: Test doas -> mitogen__user1
become: true
become_user: mitogen__user1
command: whoami
changed_when: false
check_mode: false
register: doas_mitogen__user1
when:
- become_unpriv_available
- assert:
that:
- doas_mitogen__user1.stdout == 'mitogen__user1'
fail_msg:
doas_mitogen__user1={{ doas_mitogen__user1 }}
when:
- become_unpriv_available
tags:
- doas
- issue_1309
- mitogen_only
- name: integration/become/doas.yml - FQCN
hosts: test-targets:&linux_containers
gather_facts: false
become_method: community.general.doas
vars:
ansible_become_password: has_sudo_nopw_password
tasks:
# Vanilla Ansible doas requires pipelining=false
# https://github.com/ansible-collections/community.general/issues/9977
- include_tasks: ../_mitogen_only.yml
- name: Test community.general.doas -> default target user
become: true
command: whoami
changed_when: false
check_mode: false
register: fq_doas_default_user
- assert:
that:
- fq_doas_default_user.stdout == 'root'
fail_msg:
fq_doas_default_user={{ fq_doas_default_user }}
- name: Test community.general.doas -> mitogen__user1
become: true
become_user: mitogen__user1
command: whoami
changed_when: false
check_mode: false
register: fq_doas_mitogen__user1
when:
- become_unpriv_available
- assert:
that:
- fq_doas_mitogen__user1.stdout == 'mitogen__user1'
fail_msg:
fq_doas_mitogen__user1={{ fq_doas_mitogen__user1 }}
when:
- become_unpriv_available
tags:
- doas
- issue_1309
- mitogen_only

@ -0,0 +1,32 @@
import os
import tempfile
import mitogen.core
import testlib
class BlockingIOTest(testlib.TestCase):
def setUp(self):
super(BlockingIOTest, self).setUp()
self.fp = tempfile.TemporaryFile()
self.fd = self.fp.fileno()
def tearDown(self):
self.fp.close()
super(BlockingIOTest, self).tearDown()
def test_get_blocking(self):
if hasattr(os, 'get_blocking'):
self.assertEqual(
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
)
self.assertTrue(mitogen.core.get_blocking(self.fd) is True)
def test_set_blocking(self):
mitogen.core.set_blocking(self.fd, False)
if hasattr(os, 'get_blocking'):
self.assertEqual(
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
)
self.assertTrue(mitogen.core.get_blocking(self.fd) is False)

@ -190,7 +190,7 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
mitogen.core.set_block(proc.stdin.fileno()) mitogen.core.set_blocking(proc.stdin.fileno(), True)
# read(3) below due to https://bugs.python.org/issue37696 # read(3) below due to https://bugs.python.org/issue37696
self.assertEqual(mitogen.core.b('hi\n'), proc.stdin.read(3)) self.assertEqual(mitogen.core.b('hi\n'), proc.stdin.read(3))
waited_pid, status = os.waitpid(proc.pid, 0) waited_pid, status = os.waitpid(proc.pid, 0)

@ -0,0 +1,16 @@
import fcntl
import os
import sys
def shout_stdout(size):
sys.stdout.write('A' * size)
return 'success'
def file_is_blocking(fobj):
return not (fcntl.fcntl(fobj.fileno(), fcntl.F_GETFL) & os.O_NONBLOCK)
def stdio_is_blocking():
return [file_is_blocking(f) for f in [sys.stdin, sys.stdout, sys.stderr]]

@ -0,0 +1,35 @@
import os
import mitogen.core
import testlib
class PipeTest(testlib.TestCase):
def test_pipe_blocking_unspecified(self):
"Test that unspecified blocking arg (None) behaves same as os.pipe()"
os_rfd, os_wfd = os.pipe()
mi_rfp, mi_wfp = mitogen.core.pipe()
self.assertEqual(mitogen.core.get_blocking(os_rfd),
mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertEqual(mitogen.core.get_blocking(os_wfd),
mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()
os.close(os_rfd)
os.close(os_wfd)
def test_pipe_blocking_true(self):
mi_rfp, mi_wfp = mitogen.core.pipe(blocking=True)
self.assertTrue(mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertTrue(mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()
def test_pipe_blocking_false(self):
mi_rfp, mi_wfp = mitogen.core.pipe(blocking=False)
self.assertFalse(mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertFalse(mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()

@ -28,15 +28,13 @@ class SockMixin(object):
# buffers on both sides (bidirectional IO), making it easier to test # buffers on both sides (bidirectional IO), making it easier to test
# combinations of readability/writeability on the one side of a single # combinations of readability/writeability on the one side of a single
# file object. # file object.
self.l1_sock, self.r1_sock = socket.socketpair() self.l1_sock, self.r1_sock = mitogen.core.socketpair(blocking=False)
self.l1 = self.l1_sock.fileno() self.l1 = self.l1_sock.fileno()
self.r1 = self.r1_sock.fileno() self.r1 = self.r1_sock.fileno()
self.l2_sock, self.r2_sock = socket.socketpair() self.l2_sock, self.r2_sock = mitogen.core.socketpair(blocking=False)
self.l2 = self.l2_sock.fileno() self.l2 = self.l2_sock.fileno()
self.r2 = self.r2_sock.fileno() self.r2 = self.r2_sock.fileno()
for fp in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fp)
def fill(self, fd): def fill(self, fd):
"""Make `fd` unwriteable.""" """Make `fd` unwriteable."""

@ -0,0 +1,35 @@
import socket
import mitogen.core
import testlib
class SocketPairTest(testlib.TestCase):
def test_socketpair_blocking_unspecified(self):
"Test that unspecified blocking arg (None) batches socket.socketpair()"
sk_fp1, sk_fp2 = socket.socketpair()
mi_fp1, mi_fp2 = mitogen.core.socketpair()
self.assertEqual(mitogen.core.get_blocking(sk_fp1.fileno()),
mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertEqual(mitogen.core.get_blocking(sk_fp2.fileno()),
mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()
sk_fp1.close()
sk_fp2.close()
def test_socketpair_blocking_true(self):
mi_fp1, mi_fp2 = mitogen.core.socketpair(blocking=True)
self.assertTrue(mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertTrue(mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()
def test_socketpair_blocking_false(self):
mi_fp1, mi_fp2 = mitogen.core.socketpair(blocking=False)
self.assertFalse(mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertFalse(mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()

@ -0,0 +1,28 @@
import testlib
import stdio_checks
class StdIOTest(testlib.RouterMixin, testlib.TestCase):
"""
Test that stdin, stdout, and stderr conform to common expectations,
such as blocking IO.
"""
def test_can_write_stdout_1_mib(self):
"""
Writing to stdout should not raise EAGAIN. Regression test for
https://github.com/mitogen-hq/mitogen/issues/712.
"""
size = 1 * 2**20
context = self.router.local()
result = context.call(stdio_checks.shout_stdout, size)
self.assertEqual('success', result)
def test_stdio_is_blocking(self):
context = self.router.local()
stdin_blocking, stdout_blocking, stderr_blocking = context.call(
stdio_checks.stdio_is_blocking,
)
self.assertTrue(stdin_blocking)
self.assertTrue(stdout_blocking)
self.assertTrue(stderr_blocking)

@ -55,10 +55,10 @@
[tox] [tox]
envlist = envlist =
init, init,
py{27,36}-mode_ansible-ansible{2.10,3,4}, py{27,36}-m_ans-ans{2.10,3,4}
py{311}-mode_ansible-ansible{2.10,3,4,5}, py{311}-m_ans-ans{2.10,3-5}
py{313}-mode_ansible-ansible{6,7,8,9,10,11,12}, py{313}-m_ans-ans{6-12}
py{27,36,313}-mode_mitogen, py{27,36,313}-m_mtg
report, report,
[testenv] [testenv]
@ -76,28 +76,28 @@ basepython =
py313: python3.13 py313: python3.13
deps = deps =
-r{toxinidir}/tests/requirements.txt -r{toxinidir}/tests/requirements.txt
mode_ansible: -r{toxinidir}/tests/ansible/requirements.txt m_ans: -r{toxinidir}/tests/ansible/requirements.txt
ansible2.10: ansible~=2.10.0 ans2.10: ansible~=2.10.0
ansible3: ansible~=3.0 ans3: ansible~=3.0
ansible4: ansible~=4.0 ans4: ansible~=4.0
ansible5: ansible~=5.0 ans5: ansible~=5.0
# From Ansible 6 PyPI distributions include a wheel # From Ansible 6 PyPI distributions include a wheel
ansible6: ansible~=6.0 ans6: ansible~=6.0
ansible7: ansible~=7.0 ans7: ansible~=7.0
ansible8: ansible~=8.0 ans8: ansible~=8.0
ansible9: ansible~=9.0 ans9: ansible~=9.0
ansible10: ansible~=10.0 ans10: ansible~=10.0
ansible11: ansible~=11.0 ans11: ansible~=11.0
ansible12: ansible>=12.0.0b2 ans12: ansible>=12.0.0b2
install_command = install_command =
python -m pip --no-python-version-warning --disable-pip-version-check install {opts} {packages} python -m pip --no-python-version-warning --disable-pip-version-check install {opts} {packages}
commands_pre = commands_pre =
mode_debops_common: {toxinidir}/.ci/debops_common_install.py mode_debops_common: {toxinidir}/.ci/debops_common_install.py
commands = commands =
mode_ansible: {toxinidir}/.ci/ansible_tests.py m_ans: {toxinidir}/.ci/ansible_tests.py
mode_debops_common: {toxinidir}/.ci/debops_common_tests.py mode_debops_common: {toxinidir}/.ci/debops_common_tests.py
mode_localhost: {toxinidir}/.ci/localhost_ansible_tests.py m_lcl: {toxinidir}/.ci/localhost_ansible_tests.py
mode_mitogen: {toxinidir}/.ci/mitogen_tests.py m_mtg: {toxinidir}/.ci/mitogen_tests.py
passenv = passenv =
ANSIBLE_* ANSIBLE_*
HOME HOME
@ -111,20 +111,18 @@ setenv =
PIP_CONSTRAINT={toxinidir}/tests/constraints.txt PIP_CONSTRAINT={toxinidir}/tests/constraints.txt
# Superceded in Ansible >= 6 (ansible-core >= 2.13) by result_format=yaml # Superceded in Ansible >= 6 (ansible-core >= 2.13) by result_format=yaml
# Deprecated in Ansible 12 (ansible-core 2.19) # Deprecated in Ansible 12 (ansible-core 2.19)
ansible{2.10,3-5}: DEFAULT_STDOUT_CALLBACK=yaml ans{2.10,3,4,5}: ANSIBLE_STDOUT_CALLBACK=yaml
# Print warning on the first occurence at each module:linenno in Mitogen. Available Python 2.7, 3.2+. # Print warning on the first occurence at each module:linenno in Mitogen. Available Python 2.7, 3.2+.
PYTHONWARNINGS=default:::ansible_mitogen,default:::mitogen PYTHONWARNINGS=default:::ansible_mitogen,default:::mitogen
# Ansible 6 - 8 (ansible-core 2.13 - 2.15) require Python 2.7 or >= 3.5 on targets # Ansible 6 - 8 (ansible-core 2.13 - 2.15) require Python 2.7 or >= 3.5 on targets
ansible6: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1604 ubuntu1804 ubuntu2004 ans{6,7,8}: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1604 ubuntu1804 ubuntu2004
ansible7: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1604 ubuntu1804 ubuntu2004
ansible8: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1604 ubuntu1804 ubuntu2004
# Ansible 9 (ansible-core 2.16) requires Python 2.7 or >= 3.6 on targets # Ansible 9 (ansible-core 2.16) requires Python 2.7 or >= 3.6 on targets
ansible9: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1804 ubuntu2004 ans9: MITOGEN_TEST_DISTRO_SPECS=centos7 centos8 debian9 debian10 debian11 ubuntu1804 ubuntu2004
# Ansible 10 (ansible-core 2.17) requires Python >= 3.7 on targets # Ansible 10 (ansible-core 2.17) requires Python >= 3.7 on targets
ansible10: MITOGEN_TEST_DISTRO_SPECS=debian10-py3 debian11-py3 ubuntu2004-py3 ans10: MITOGEN_TEST_DISTRO_SPECS=debian10-py3 debian11-py3 ubuntu2004-py3
# Ansible 11 (ansible-core 2.18) requires Python >= 3.8 on targets # Ansible 11 (ansible-core 2.18) requires Python >= 3.8 on targets
ansible11: MITOGEN_TEST_DISTRO_SPECS=debian11-py3 ubuntu2004-py3 ans11: MITOGEN_TEST_DISTRO_SPECS=debian11-py3 ubuntu2004-py3
ansible12: MITOGEN_TEST_DISTRO_SPECS=debian11-py3 ubuntu2004-py3 ans12: MITOGEN_TEST_DISTRO_SPECS=debian11-py3 ubuntu2004-py3
distros_centos: MITOGEN_TEST_DISTRO_SPECS=centos6 centos7 centos8 distros_centos: MITOGEN_TEST_DISTRO_SPECS=centos6 centos7 centos8
distros_centos5: MITOGEN_TEST_DISTRO_SPECS=centos5 distros_centos5: MITOGEN_TEST_DISTRO_SPECS=centos5
distros_centos6: MITOGEN_TEST_DISTRO_SPECS=centos6 distros_centos6: MITOGEN_TEST_DISTRO_SPECS=centos6
@ -138,14 +136,14 @@ setenv =
distros_ubuntu1604: MITOGEN_TEST_DISTRO_SPECS=ubuntu1604 distros_ubuntu1604: MITOGEN_TEST_DISTRO_SPECS=ubuntu1604
distros_ubuntu1804: MITOGEN_TEST_DISTRO_SPECS=ubuntu1804 distros_ubuntu1804: MITOGEN_TEST_DISTRO_SPECS=ubuntu1804
distros_ubuntu2004: MITOGEN_TEST_DISTRO_SPECS=ubuntu2004 distros_ubuntu2004: MITOGEN_TEST_DISTRO_SPECS=ubuntu2004
mode_ansible: MODE=ansible m_ans: MODE=ansible
mode_ansible: ANSIBLE_SKIP_TAGS=resource_intensive m_ans: ANSIBLE_SKIP_TAGS=resource_intensive
mode_ansible: ANSIBLE_CALLBACK_WHITELIST=profile_tasks m_ans: ANSIBLE_CALLBACK_WHITELIST=profile_tasks
mode_ansible: ANSIBLE_CALLBACKS_ENABLED=profile_tasks m_ans: ANSIBLE_CALLBACKS_ENABLED=profile_tasks
mode_debops_common: MODE=debops_common mode_debops_common: MODE=debops_common
mode_localhost: ANSIBLE_SKIP_TAGS=issue_776,resource_intensive m_lcl: ANSIBLE_SKIP_TAGS=issue_776,resource_intensive
mode_mitogen: MODE=mitogen m_mtg: MODE=mitogen
strategy_linear: ANSIBLE_STRATEGY=linear s_lin: ANSIBLE_STRATEGY=linear
allowlist_externals = allowlist_externals =
# Added: Tox 3.18: Tox 4.0+ # Added: Tox 3.18: Tox 4.0+
*_install.py *_install.py

Loading…
Cancel
Save