Merge pull request #1300 from moreati/issue712-stdout-non-blocking

stdio EAGAIN investigation
pull/1320/head
Alex Willmer 4 months ago committed by GitHub
commit 64feda250e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -426,7 +426,7 @@ class ClassicWorkerModel(WorkerModel):
common_setup(_init_logging=_init_logging)
self.parent_sock, self.child_sock = socket.socketpair()
self.parent_sock, self.child_sock = mitogen.core.socketpair()
mitogen.core.set_cloexec(self.parent_sock.fileno())
mitogen.core.set_cloexec(self.child_sock.fileno())

@ -23,6 +23,8 @@ In progress (unreleased)
* :gh:issue:`1318` CI: Abbreviate Github Actions job names
* :gh:issue:`1309` :mod:`ansible_mitogen`: Fix ``become_method: doas``
* :gh:issue:`712` :mod:`mitogen`: Fix :exc:`BlockingIOError` & ``EAGAIN``
errors in subprocesses that write to stdio
v0.3.25 (2025-07-29)

@ -87,6 +87,17 @@ import warnings
import weakref
import zlib
if sys.version_info > (3,5):
from os import get_blocking, set_blocking
else:
def get_blocking(fd):
return not fcntl.fcntl(fd, fcntl.F_GETFL) & os.O_NONBLOCK
def set_blocking(fd, blocking):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
if blocking: fcntl.fcntl(fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
else: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
# Python >= 3.4, PEP 451 ModuleSpec API
import importlib.machinery
@ -559,26 +570,6 @@ def set_cloexec(fd):
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def set_nonblock(fd):
"""
Set the file descriptor `fd` to non-blocking mode. For most underlying file
types, this causes :func:`os.read` or :func:`os.write` to raise
:class:`OSError` with :data:`errno.EAGAIN` rather than block the thread
when the underlying kernel buffer is exhausted.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def set_block(fd):
"""
Inverse of :func:`set_nonblock`, i.e. cause `fd` to block the thread when
the underlying kernel buffer is exhausted.
"""
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
def io_op(func, *args):
"""
Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`,
@ -720,7 +711,7 @@ def import_module(modname):
return __import__(modname, None, None, [''])
def pipe():
def pipe(blocking=None):
"""
Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned
descriptors in Python file objects in order to manage their lifetime and
@ -728,12 +719,22 @@ def pipe():
not been closed explicitly.
"""
rfd, wfd = os.pipe()
for fd in rfd, wfd:
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
return (
os.fdopen(rfd, 'rb', 0),
os.fdopen(wfd, 'wb', 0)
)
def socketpair(blocking=None):
fp1, fp2 = socket.socketpair()
for fp in fp1, fp2:
fd = fp.fileno()
if blocking is not None: set_blocking(fd, blocking) # noqa: E701
return fp1, fp2
def iter_split(buf, delim, func):
"""
Invoke `func(s)` for each `delim`-delimited chunk in the potentially large
@ -1879,8 +1880,7 @@ class Stream(object):
"""
Attach a pair of file objects to :attr:`receive_side` and
:attr:`transmit_side`, after wrapping them in :class:`Side` instances.
:class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
on the underlying file descriptors during construction.
:class:`Side` will call :func:`set_cloexec` on them.
The same file object may be used for both sides. The default
:meth:`on_disconnect` is handles the possibility that only one
@ -2155,14 +2155,11 @@ class Side(object):
:param bool keep_alive:
If :data:`True`, the continued existence of this side will extend the
shutdown grace period until it has been unregistered from the broker.
:param bool blocking:
If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag
enabled using :func:`fcntl.fcntl`.
"""
_fork_refs = weakref.WeakValueDictionary()
closed = False
def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
def __init__(self, stream, fp, cloexec=True, keep_alive=True):
#: The :class:`Stream` for which this is a read or write side.
self.stream = stream
# File or socket object responsible for the lifetime of its underlying
@ -2180,8 +2177,6 @@ class Side(object):
self._fork_refs[id(self)] = self
if cloexec:
set_cloexec(self.fd)
if not blocking:
set_nonblock(self.fd)
def __repr__(self):
return '<Side of %s fd %s>' % (
@ -2785,7 +2780,7 @@ class Latch(object):
try:
return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError:
rsock, wsock = socket.socketpair()
rsock, wsock = socketpair()
rsock.setblocking(False)
set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno())
@ -2958,7 +2953,8 @@ class Waker(Protocol):
@classmethod
def build_stream(cls, broker):
stream = super(Waker, cls).build_stream(broker)
stream.accept(*pipe())
rfp, wfp = pipe(blocking=False)
stream.accept(rfp, wfp)
return stream
def __init__(self, broker):
@ -3056,7 +3052,8 @@ class IoLoggerProtocol(DelimitedProtocol):
prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
<socket.socket.shutdown>` on it.
"""
rsock, wsock = socket.socketpair()
# Leave wsock & dest_fd blocking, so the subprocess will have sane stdio
rsock, wsock = socketpair()
os.dup2(wsock.fileno(), dest_fd)
stream = super(IoLoggerProtocol, cls).build_stream(name)
stream.name = name
@ -4038,6 +4035,9 @@ class ExternalContext(object):
local_id=self.config['context_id'],
parent_ids=self.config['parent_ids']
)
for f in in_fp, out_fp:
fd = f.fileno()
set_blocking(fd, False)
self.stream.accept(in_fp, out_fp)
self.stream.name = 'parent'
self.stream.receive_side.keep_alive = False

@ -179,6 +179,9 @@ class Process(object):
self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump.build_stream(router.broker)
for fp in stdin, stdout:
fd = fp.fileno()
mitogen.core.set_blocking(fd, False)
self.pump.accept(stdin, stdout)
self.stdin = None
self.control = None
@ -419,10 +422,11 @@ def run(dest, router, args, deadline=None, econtext=None):
fakessh = mitogen.parent.Context(router, context_id)
fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = socket.socketpair()
sock1, sock2 = mitogen.core.socketpair()
stream = mitogen.core.Stream(router, context_id)
stream.name = u'fakessh'
mitogen.core.set_blocking(sock1.fileno(), False)
stream.accept(sock1, sock1)
router.register(fakessh, stream)

@ -211,7 +211,7 @@ class Connection(mitogen.parent.Connection):
on_fork()
if self.options.on_fork:
self.options.on_fork()
mitogen.core.set_block(childfp.fileno())
mitogen.core.set_blocking(childfp.fileno(), True)
childfp.send(b('MITO002\n'))

@ -38,6 +38,7 @@ import sys
import weakref
import mitogen.core
import mitogen.parent
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
@ -131,9 +132,9 @@ class Corker(object):
`obj` to be written to by one of its threads.
"""
rsock, wsock = mitogen.parent.create_socketpair(size=4096)
mitogen.core.set_blocking(wsock.fileno(), True) # gevent
mitogen.core.set_cloexec(rsock.fileno())
mitogen.core.set_cloexec(wsock.fileno())
mitogen.core.set_block(wsock) # gevent
self._rsocks.append(rsock)
obj.defer(self._do_cork, s, wsock)

@ -265,7 +265,7 @@ def disable_echo(fd):
termios.tcsetattr(fd, flags, new)
def create_socketpair(size=None):
def create_socketpair(size=None, blocking=None):
"""
Create a :func:`socket.socketpair` for use as a child's UNIX stdio
channels. As socketpairs are bidirectional, they are economical on file
@ -276,14 +276,14 @@ def create_socketpair(size=None):
if size is None:
size = mitogen.core.CHUNK_SIZE
parentfp, childfp = socket.socketpair()
parentfp, childfp = mitogen.core.socketpair(blocking)
for fp in parentfp, childfp:
fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
return parentfp, childfp
def create_best_pipe(escalates_privilege=False):
def create_best_pipe(escalates_privilege=False, blocking=None):
"""
By default we prefer to communicate with children over a UNIX socket, as a
single file descriptor can represent bidirectional communication, and a
@ -301,16 +301,19 @@ def create_best_pipe(escalates_privilege=False):
:param bool escalates_privilege:
If :data:`True`, the target program may escalate privileges, causing
SELinux to disconnect AF_UNIX sockets, so avoid those.
:param None|bool blocking:
If :data:`False` or :data:`True`, set non-blocking or blocking mode.
If :data:`None` (default), use default.
:returns:
`(parent_rfp, child_wfp, child_rfp, parent_wfp)`
"""
if (not escalates_privilege) or (not SELINUX_ENABLED):
parentfp, childfp = create_socketpair()
parentfp, childfp = create_socketpair(blocking=blocking)
return parentfp, childfp, childfp, parentfp
parent_rfp, child_wfp = mitogen.core.pipe()
parent_rfp, child_wfp = mitogen.core.pipe(blocking)
try:
child_rfp, parent_wfp = mitogen.core.pipe()
child_rfp, parent_wfp = mitogen.core.pipe(blocking)
return parent_rfp, child_wfp, child_rfp, parent_wfp
except:
parent_rfp.close()
@ -481,7 +484,7 @@ def openpty():
if not IS_SOLARIS:
disable_echo(master_fd)
disable_echo(slave_fd)
mitogen.core.set_block(slave_fd)
mitogen.core.set_blocking(slave_fd, True)
return master_fp, slave_fp
@ -547,8 +550,8 @@ def hybrid_tty_create_child(args, escalates_privilege=False):
escalates_privilege=escalates_privilege,
)
try:
mitogen.core.set_block(child_rfp)
mitogen.core.set_block(child_wfp)
mitogen.core.set_blocking(child_rfp.fileno(), True)
mitogen.core.set_blocking(child_wfp.fileno(), True)
proc = popen(
args=args,
stdin=child_rfp,
@ -1643,6 +1646,9 @@ class Connection(object):
stream = self.stream_factory()
stream.conn = self
stream.name = self.options.name or self._get_name()
for fp in self.proc.stdout, self.proc.stdin:
fd = fp.fileno()
mitogen.core.set_blocking(fd, False)
stream.accept(self.proc.stdout, self.proc.stdin)
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
@ -1653,6 +1659,8 @@ class Connection(object):
stream = self.stderr_stream_factory()
stream.conn = self
stream.name = self.options.name or self._get_name()
fd = self.proc.stderr.fileno()
mitogen.core.set_blocking(fd, False)
stream.accept(self.proc.stderr, self.proc.stderr)
mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)

@ -111,6 +111,7 @@ class Listener(mitogen.core.Protocol):
sock.listen(backlog)
stream = super(Listener, cls).build_stream(router, path)
mitogen.core.set_blocking(sock.fileno(), False)
stream.accept(sock, sock)
router.broker.start_receive(stream)
return stream
@ -169,6 +170,7 @@ class Listener(mitogen.core.Protocol):
auth_id=mitogen.context_id,
)
stream.name = u'unix_client.%d' % (pid,)
mitogen.core.set_blocking(sock.fileno(), False)
stream.accept(sock, sock)
LOG.debug('listener: accepted connection from PID %d: %s',
pid, stream.name)

@ -0,0 +1,32 @@
import os
import tempfile
import mitogen.core
import testlib
class BlockingIOTest(testlib.TestCase):
def setUp(self):
super(BlockingIOTest, self).setUp()
self.fp = tempfile.TemporaryFile()
self.fd = self.fp.fileno()
def tearDown(self):
self.fp.close()
super(BlockingIOTest, self).tearDown()
def test_get_blocking(self):
if hasattr(os, 'get_blocking'):
self.assertEqual(
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
)
self.assertTrue(mitogen.core.get_blocking(self.fd) is True)
def test_set_blocking(self):
mitogen.core.set_blocking(self.fd, False)
if hasattr(os, 'get_blocking'):
self.assertEqual(
os.get_blocking(self.fd), mitogen.core.get_blocking(self.fd),
)
self.assertTrue(mitogen.core.get_blocking(self.fd) is False)

@ -190,7 +190,7 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
mitogen.core.set_block(proc.stdin.fileno())
mitogen.core.set_blocking(proc.stdin.fileno(), True)
# read(3) below due to https://bugs.python.org/issue37696
self.assertEqual(mitogen.core.b('hi\n'), proc.stdin.read(3))
waited_pid, status = os.waitpid(proc.pid, 0)

@ -0,0 +1,16 @@
import fcntl
import os
import sys
def shout_stdout(size):
sys.stdout.write('A' * size)
return 'success'
def file_is_blocking(fobj):
return not (fcntl.fcntl(fobj.fileno(), fcntl.F_GETFL) & os.O_NONBLOCK)
def stdio_is_blocking():
return [file_is_blocking(f) for f in [sys.stdin, sys.stdout, sys.stderr]]

@ -0,0 +1,35 @@
import os
import mitogen.core
import testlib
class PipeTest(testlib.TestCase):
def test_pipe_blocking_unspecified(self):
"Test that unspecified blocking arg (None) behaves same as os.pipe()"
os_rfd, os_wfd = os.pipe()
mi_rfp, mi_wfp = mitogen.core.pipe()
self.assertEqual(mitogen.core.get_blocking(os_rfd),
mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertEqual(mitogen.core.get_blocking(os_wfd),
mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()
os.close(os_rfd)
os.close(os_wfd)
def test_pipe_blocking_true(self):
mi_rfp, mi_wfp = mitogen.core.pipe(blocking=True)
self.assertTrue(mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertTrue(mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()
def test_pipe_blocking_false(self):
mi_rfp, mi_wfp = mitogen.core.pipe(blocking=False)
self.assertFalse(mitogen.core.get_blocking(mi_rfp.fileno()))
self.assertFalse(mitogen.core.get_blocking(mi_wfp.fileno()))
mi_rfp.close()
mi_wfp.close()

@ -28,15 +28,13 @@ class SockMixin(object):
# buffers on both sides (bidirectional IO), making it easier to test
# combinations of readability/writeability on the one side of a single
# file object.
self.l1_sock, self.r1_sock = socket.socketpair()
self.l1_sock, self.r1_sock = mitogen.core.socketpair(blocking=False)
self.l1 = self.l1_sock.fileno()
self.r1 = self.r1_sock.fileno()
self.l2_sock, self.r2_sock = socket.socketpair()
self.l2_sock, self.r2_sock = mitogen.core.socketpair(blocking=False)
self.l2 = self.l2_sock.fileno()
self.r2 = self.r2_sock.fileno()
for fp in self.l1, self.r1, self.l2, self.r2:
mitogen.core.set_nonblock(fp)
def fill(self, fd):
"""Make `fd` unwriteable."""

@ -0,0 +1,35 @@
import socket
import mitogen.core
import testlib
class SocketPairTest(testlib.TestCase):
def test_socketpair_blocking_unspecified(self):
"Test that unspecified blocking arg (None) batches socket.socketpair()"
sk_fp1, sk_fp2 = socket.socketpair()
mi_fp1, mi_fp2 = mitogen.core.socketpair()
self.assertEqual(mitogen.core.get_blocking(sk_fp1.fileno()),
mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertEqual(mitogen.core.get_blocking(sk_fp2.fileno()),
mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()
sk_fp1.close()
sk_fp2.close()
def test_socketpair_blocking_true(self):
mi_fp1, mi_fp2 = mitogen.core.socketpair(blocking=True)
self.assertTrue(mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertTrue(mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()
def test_socketpair_blocking_false(self):
mi_fp1, mi_fp2 = mitogen.core.socketpair(blocking=False)
self.assertFalse(mitogen.core.get_blocking(mi_fp1.fileno()))
self.assertFalse(mitogen.core.get_blocking(mi_fp2.fileno()))
mi_fp1.close()
mi_fp2.close()

@ -0,0 +1,28 @@
import testlib
import stdio_checks
class StdIOTest(testlib.RouterMixin, testlib.TestCase):
"""
Test that stdin, stdout, and stderr conform to common expectations,
such as blocking IO.
"""
def test_can_write_stdout_1_mib(self):
"""
Writing to stdout should not raise EAGAIN. Regression test for
https://github.com/mitogen-hq/mitogen/issues/712.
"""
size = 1 * 2**20
context = self.router.local()
result = context.call(stdio_checks.shout_stdout, size)
self.assertEqual('success', result)
def test_stdio_is_blocking(self):
context = self.router.local()
stdin_blocking, stdout_blocking, stderr_blocking = context.call(
stdio_checks.stdio_is_blocking,
)
self.assertTrue(stdin_blocking)
self.assertTrue(stdout_blocking)
self.assertTrue(stderr_blocking)
Loading…
Cancel
Save