You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/tests/create_child_test.py

345 lines
13 KiB
Python

import fcntl
import os
import stat
import sys
import tempfile
import mitogen.core
import mitogen.parent
from mitogen.core import b
import testlib
def _osx_mode(n):
"""
fstat(2) on UNIX sockets on OSX return different mode bits depending on
which side is being inspected, so zero those bits for comparison.
"""
if sys.platform == 'darwin':
n &= ~int('0777', 8)
return n
def run_fd_check(func, fd, mode, on_start=None):
"""
Run ``tests/data/fd_check.py`` using `func`. The subprocess writes
information about the `fd` it received to a temporary file.
:param func:
Function like `create_child()` used to start child.
:param fd:
FD child should read/write from, and report information about.
:param mode:
"read" or "write", depending on whether the FD is readable or writeable
from the perspective of the child. If "read", `on_start()` should write
"TEST" to it and the child reads "TEST" from it, otherwise `on_start()`
should read "TEST" from it and the child writes "TEST" to it.
:param on_start:
Function invoked as `on_start(proc)`
:returns:
Tuple of `(proc, info, on_start_result)`, where:
* `proc`: the :class:`mitogen.parent.Process` returned by `func`.
* `info`: dict containing information returned by the child:
* `buf`: "TEST" that was read in "read" mode
* `flags`: :attr:`fcntl.F_GETFL` flags for `fd`
* `st_mode`: st_mode field from :func:`os.fstat`
* `st_dev`: st_dev field from :func:`os.fstat`
* `st_ino`: st_ino field from :func:`os.fstat`
* `ttyname`: :func:`os.ttyname` invoked on `fd`.
* `controlling_tty`: :func:os.ttyname` invoked on ``/dev/tty``
from within the child.
"""
tf = tempfile.NamedTemporaryFile()
args = [
sys.executable,
testlib.data_path('fd_check.py'),
tf.name,
str(fd),
mode,
]
proc = func(args=args)
os = None
if on_start:
os = on_start(proc)
proc.proc.wait()
try:
return proc, eval(tf.read()), os
finally:
tf.close()
def close_proc(proc):
proc.stdin.close()
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
def wait_read(fp, n):
poller = mitogen.core.Poller()
try:
poller.start_receive(fp.fileno())
for _ in poller.poll():
return os.read(fp.fileno(), n)
assert False
finally:
poller.close()
class StdinSockMixin(object):
def test_stdin(self):
proc, info, _ = run_fd_check(self.func, 0, 'read',
lambda proc: proc.stdin.send(b('TEST')))
st = os.fstat(proc.stdin.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, _osx_mode(info['st_mode']))
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['buf'], 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
close_proc(proc)
class StdoutSockMixin(object):
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, _osx_mode(info['st_mode']))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
close_proc(proc)
class CreateChildTest(StdinSockMixin, StdoutSockMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.create_child)
def test_stderr(self):
proc, info, _ = run_fd_check(self.func, 2, 'write')
st = os.fstat(sys.stderr.fileno())
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, info['st_mode'])
self.assertEqual(st.st_ino, info['st_ino'])
close_proc(proc)
class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
testlib.TestCase):
def func(self, *args, **kwargs):
kwargs['merge_stdio'] = True
return mitogen.parent.create_child(*args, **kwargs)
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stdout, 4))
self.assertEqual(None, proc.stderr)
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
close_proc(proc)
class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
testlib.TestCase):
def func(self, *args, **kwargs):
kwargs['stderr_pipe'] = True
return mitogen.parent.create_child(*args, **kwargs)
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stderr, 4))
st = os.fstat(proc.stderr.fileno())
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL)
self.assertFalse(flags & os.O_WRONLY)
self.assertFalse(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
self.assertTrue(info['flags'] & os.O_WRONLY)
close_proc(proc)
class TtyCreateChildTest(testlib.TestCase):
func = staticmethod(mitogen.parent.tty_create_child)
def test_dev_tty_open_succeeds(self):
# In the early days of UNIX, a process that lacked a controlling TTY
# would acquire one simply by opening an existing TTY. Linux and OS X
# continue to follow this behaviour, however at least FreeBSD moved to
# requiring an explicit ioctl(). Linux supports it, but we don't yet
# use it there and anyway the behaviour will never change, so no point
# in fixing things that aren't broken. Below we test that
# getpass-loving apps like sudo and ssh get our slave PTY when they
# attempt to open /dev/tty, which is what they both do on attempting to
# read a password.
tf = tempfile.NamedTemporaryFile()
try:
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
mitogen.core.set_block(proc.stdin.fileno())
# read(3) below due to https://bugs.python.org/issue37696
self.assertEqual(mitogen.core.b('hi\n'), proc.stdin.read(3))
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEqual(proc.pid, waited_pid)
self.assertEqual(0, status)
self.assertEqual(mitogen.core.b(''), tf.read())
finally:
tf.close()
close_proc(proc)
def test_stdin(self):
proc, info, _ = run_fd_check(self.func, 0, 'read',
lambda proc: proc.stdin.write(b('TEST')))
st = os.fstat(proc.stdin.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertIsInstance(info['ttyname'], mitogen.core.UnicodeType)
self.assertTrue(os.isatty(proc.stdin.fileno()))
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertTrue(info['buf'], 'TEST')
close_proc(proc)
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertIsInstance(info['ttyname'], mitogen.core.UnicodeType)
self.assertTrue(os.isatty(proc.stdout.fileno()))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
close_proc(proc)
def test_stderr(self):
# proc.stderr is None in the parent since there is no separate stderr
# stream. In the child, FD 2/stderr is connected to the TTY.
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertIsInstance(info['ttyname'], mitogen.core.UnicodeType)
self.assertTrue(os.isatty(proc.stdout.fileno()))
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
close_proc(proc)
def test_dev_tty_open_succeeds(self):
# In the early days of UNIX, a process that lacked a controlling TTY
# would acquire one simply by opening an existing TTY. Linux and OS X
# continue to follow this behaviour, however at least FreeBSD moved to
# requiring an explicit ioctl(). Linux supports it, but we don't yet
# use it there and anyway the behaviour will never change, so no point
# in fixing things that aren't broken. Below we test that
# getpass-loving apps like sudo and ssh get our slave PTY when they
# attempt to open /dev/tty, which is what they both do on attempting to
# read a password.
tf = tempfile.NamedTemporaryFile()
try:
proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
self.assertEqual(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3))
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEqual(proc.pid, waited_pid)
self.assertEqual(0, status)
self.assertEqual(mitogen.core.b(''), tf.read())
proc.stdout.close()
finally:
tf.close()
close_proc(proc)
class StderrDiagTtyMixin(object):
def test_stderr(self):
# proc.stderr is the PTY master, FD 2 in the child is the PTY slave
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.stderr, 4))
st = os.fstat(proc.stderr.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertIsInstance(info['ttyname'], mitogen.core.UnicodeType)
self.assertTrue(os.isatty(proc.stderr.fileno()))
flags = fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
close_proc(proc)
class HybridTtyCreateChildTest(StdinSockMixin, StdoutSockMixin,
StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.hybrid_tty_create_child)
if 0:
# issue #410
class SelinuxHybridTtyCreateChildTest(StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.selinux_hybrid_tty_create_child)
def test_stdin(self):
proc, info, buf = run_fd_check(self.func, 0, 'read',
lambda proc: proc.transmit_side.write('TEST'))
st = os.fstat(proc.transmit_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.transmit_side.fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
self.assertFalse(info['flags'] & os.O_WRONLY)
self.assertFalse(info['flags'] & os.O_RDWR)
close_proc(proc)
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.receive_side, 4))
st = os.fstat(proc.receive_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEqual(st.st_dev, info['st_dev'])
self.assertEqual(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
self.assertFalse(flags & os.O_WRONLY)
self.assertFalse(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
close_proc(proc)