[stream-refactor] rename Process attrs, fix up more create_child_test

pull/607/head
David Wilson 5 years ago
parent cfe337b3c0
commit f039c81bb0

@ -133,8 +133,8 @@ class Connection(mitogen.parent.Connection):
def _get_name(self):
return u'doas.' + self.options.username
def diag_stream_factory(self):
stream = super(Connection, self).diag_stream_factory()
def stderr_stream_factory(self):
stream = super(Connection, self).stderr_stream_factory()
stream.protocol.setup_patterns(self)
return stream

@ -171,15 +171,15 @@ class Process(object):
Manages the lifetime and pipe connections of the SSH command running in the
slave.
"""
def __init__(self, router, stdin_fp, stdout_fp, proc=None):
def __init__(self, router, stdin, stdout, proc=None):
self.router = router
self.stdin_fp = stdin_fp
self.stdout_fp = stdout_fp
self.stdin = stdin
self.stdout = stdout
self.proc = proc
self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump.build_stream(router.broker)
self.pump.accept(stdin_fp, stdout_fp)
self.pump.accept(stdin, stdout)
self.stdin = None
self.control = None
self.wake_event = threading.Event()
@ -192,7 +192,7 @@ class Process(object):
pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self):
return 'Process(%r, %r)' % (self.stdin_fp, self.stdout_fp)
return 'Process(%r, %r)' % (self.stdin, self.stdout)
def _on_proc_exit(self, status):
LOG.debug('%r._on_proc_exit(%r)', self, status)
@ -355,8 +355,8 @@ def _fakessh_main(dest_context_id, econtext):
control_handle, stdin_handle)
process = Process(econtext.router,
stdin_fp=os.fdopen(1, 'w+b', 0),
stdout_fp=os.fdopen(0, 'r+b', 0))
stdin=os.fdopen(1, 'w+b', 0),
stdout=os.fdopen(0, 'r+b', 0))
process.start_master(
stdin=mitogen.core.Sender(dest, stdin_handle),
control=mitogen.core.Sender(dest, control_handle),

@ -341,7 +341,7 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, parentfp.fileno(), os.getpid(), Argv(args))
return PopenProcess(proc, stdio_fp=parentfp, stderr_fp=stderr_r)
return PopenProcess(proc, stdin=parentfp, stdout=parentfp, stderr=stderr_r)
def _acquire_controlling_tty():
@ -455,7 +455,7 @@ def tty_create_child(args):
slave_fp.close()
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
proc.pid, master_fp.fileno(), os.getpid(), Argv(args))
return PopenProcess(proc, stdio_fp=master_fp)
return PopenProcess(proc, stdin=master_fp, stdout=master_fp)
def hybrid_tty_create_child(args):
@ -495,7 +495,7 @@ def hybrid_tty_create_child(args):
childfp.close()
LOG.debug('hybrid_tty_create_child() pid=%d stdio=%d, tty=%d, cmd: %s',
proc.pid, parentfp.fileno(), master_fp.fileno(), Argv(args))
return PopenProcess(proc, stdio_fp=parentfp, stderr_fp=master_fp)
return PopenProcess(proc, stdin=parentfp, stdout=parentfp, stderr=master_fp)
class Timer(object):
@ -1207,11 +1207,11 @@ class Connection(object):
#: :class:`mitogen.core.Stream`
stream = None
#: If :attr:`create_child` provides a stderr_fp, referencing either a plain
#: pipe or the controlling TTY, this references the corresponding
#: If `proc.stderr` is set, referencing either a plain pipe or the
#: controlling TTY, this references the corresponding
#: :class:`LogProtocol`'s stream, allowing it to be disconnected when this
#: stream is disconnected.
diag_stream = None
stderr_stream = None
#: Function with the semantics of :func:`create_child` used to create the
#: child process.
@ -1400,7 +1400,7 @@ class Connection(object):
if self.exception is None:
self._adorn_eof_error(exc)
self.exception = exc
for stream in self.stream, self.diag_stream:
for stream in self.stream, self.stderr_stream:
if stream and not stream.receive_side.closed:
stream.on_disconnect(self._router.broker)
self._complete_connection()
@ -1419,8 +1419,8 @@ class Connection(object):
eof_error_msg = 'EOF on stream; last 100 lines received:\n'
def on_stream_disconnect(self):
if self.diag_stream is not None:
self.diag_stream.on_disconnect(self._router.broker)
if self.stderr_stream is not None:
self.stderr_stream.on_disconnect(self._router.broker)
if not self.timer.cancelled:
self.timer.cancel()
self._fail_connection(EofError(
@ -1447,14 +1447,14 @@ class Connection(object):
broker=self._router.broker,
)
def diag_stream_factory(self):
def stderr_stream_factory(self):
return self.diag_protocol_class.build_stream()
def _setup_stream(self):
self.stream = self.stream_factory()
self.stream.conn = self
self.stream.name = self.options.name or self._get_name()
self.stream.accept(self.proc.stdio_fp, self.proc.stdio_fp)
self.stream.accept(self.proc.stdout, self.proc.stdin)
mitogen.core.listen(self.stream, 'shutdown',
self.on_stream_shutdown)
@ -1462,12 +1462,12 @@ class Connection(object):
self.on_stream_disconnect)
self._router.broker.start_receive(self.stream)
def _setup_diag_stream(self):
self.diag_stream = self.diag_stream_factory()
self.diag_stream.conn = self
self.diag_stream.name = self.options.name or self._get_name()
self.diag_stream.accept(self.proc.stderr_fp, self.proc.stderr_fp)
self._router.broker.start_receive(self.diag_stream)
def _setup_stderr_stream(self):
self.stderr_stream = self.stderr_stream_factory()
self.stderr_stream.conn = self
self.stderr_stream.name = self.options.name or self._get_name()
self.stderr_stream.accept(self.proc.stderr, self.proc.stderr)
self._router.broker.start_receive(self.stderr_stream)
def _async_connect(self):
self._start_timer()
@ -1475,16 +1475,18 @@ class Connection(object):
if self.context.name is None:
self.context.name = self.stream.name
self.proc.name = self.stream.name
if self.proc.stderr_fp:
self._setup_diag_stream()
if self.proc.stderr:
self._setup_stderr_stream()
def connect(self, context):
LOG.debug('%r.connect()', self)
self.context = context
self.proc = self.start_child()
LOG.debug('%r.connect(): pid:%r stdio:%r diag:%r',
self, self.proc.pid, self.proc.stdio_fp.fileno(),
self.proc.stderr_fp and self.proc.stderr_fp.fileno())
LOG.debug('%r.connect(): pid:%r stdin:%r stdout:%r diag:%r',
self, self.proc.pid,
self.proc.stdin.fileno(),
self.proc.stdout.fileno(),
self.proc.stderr.fileno())
self.latch = mitogen.core.Latch()
self._router.broker.defer(self._async_connect)
@ -2231,17 +2233,14 @@ class Router(mitogen.core.Router):
class Process(object):
"""
Install a :data:`signal.SIGCHLD` handler that generates callbacks when a
specific child process has exitted. This class is obsolete, do not use.
"""
_delays = [0.05, 0.15, 0.3, 1.0, 5.0, 10.0]
name = None
def __init__(self, pid, stdio_fp, stderr_fp=None):
def __init__(self, pid, stdin, stdout, stderr=None):
self.pid = pid
self.stdio_fp = stdio_fp
self.stderr_fp = stderr_fp
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self._returncode = None
self._reap_count = 0
@ -2307,8 +2306,8 @@ class Process(object):
class PopenProcess(Process):
def __init__(self, proc, stdio_fp, stderr_fp=None):
super(PopenProcess, self).__init__(proc.pid, stdio_fp, stderr_fp)
def __init__(self, proc, stdin, stdout, stderr=None):
super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr)
self.proc = proc
def poll(self):

@ -10,6 +10,7 @@ import mock
import unittest2
import mitogen.parent
from mitogen.core import b
import testlib
@ -36,20 +37,18 @@ def run_fd_check(func, fd, mode, on_start=None):
def close_proc(proc):
proc.receive_side.close()
proc.transmit_side.close()
if proc.diag_receive_side:
proc.diag_receive_side.close()
if proc.diag_transmit_side:
proc.diag_transmit_side.close()
proc.stdin.close()
proc.stdout.close()
if proc.stderr:
prco.stderr.close()
def wait_read(side, n):
def wait_read(fp, n):
poller = mitogen.core.Poller()
try:
poller.start_receive(side.fd)
poller.start_receive(fp.fileno())
for _ in poller.poll():
return side.read(n)
return os.read(fp.fileno(), n)
assert False
finally:
poller.close()
@ -58,12 +57,12 @@ def wait_read(side, n):
class StdinSockMixin(object):
def test_stdin(self):
proc, info, _ = run_fd_check(self.func, 0, 'read',
lambda proc: proc.transmit_side.write('TEST'))
st = os.fstat(proc.transmit_side.fd)
lambda proc: proc.stdin.send(b('TEST')))
st = os.fstat(proc.stdin.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.transmit_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['buf'], 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
@ -72,12 +71,12 @@ class StdinSockMixin(object):
class StdoutSockMixin(object):
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.receive_side, 4))
st = os.fstat(proc.transmit_side.fd)
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
@ -94,35 +93,42 @@ class CreateChildTest(StdinSockMixin, StdoutSockMixin, testlib.TestCase):
self.assertEquals(st.st_ino, info['st_ino'])
class MergedCreateChildTest(StdinSockMixin, StdoutSockMixin,
class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
testlib.TestCase):
func = staticmethod(mitogen.parent.merged_create_child)
def func(self, *args, **kwargs):
return mitogen.parent.create_child(
*args, merge_stdio=True, **kwargs
)
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.receive_side, 4))
st = os.fstat(proc.transmit_side.fd)
lambda proc: wait_read(proc.stdout, 4))
self.assertEquals(None, proc.stderr)
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISSOCK(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
self.assertTrue(info['flags'] & os.O_RDWR)
class StderrCreateChildTest(StdinSockMixin, StdoutSockMixin,
testlib.TestCase):
func = staticmethod(mitogen.parent.stderr_create_child)
class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
testlib.TestCase):
def func(self, *args, **kwargs):
return mitogen.parent.create_child(
*args, stderr_pipe=True, **kwargs
)
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.diag_receive_side, 4))
st = os.fstat(proc.diag_receive_side.fd)
lambda proc: wait_read(proc.stderr, 4))
st = os.fstat(proc.stderr.fileno())
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.diag_receive_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL)
self.assertFalse(flags & os.O_WRONLY)
self.assertFalse(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
@ -134,16 +140,16 @@ class TtyCreateChildTest(testlib.TestCase):
def test_stdin(self):
proc, info, _ = run_fd_check(self.func, 0, 'read',
lambda proc: proc.transmit_side.write('TEST'))
st = os.fstat(proc.transmit_side.fd)
lambda proc: proc.stdin.write(b('TEST')))
st = os.fstat(proc.stdin.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.transmit_side.fd) # crashes if wrong
os.ttyname(proc.stdin.fileno()) # crashes if not TTY
flags = fcntl.fcntl(proc.transmit_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
@ -152,17 +158,17 @@ class TtyCreateChildTest(testlib.TestCase):
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.receive_side, 4))
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.receive_side.fd)
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.transmit_side.fd) # crashes if wrong
os.ttyname(proc.stdout.fileno()) # crashes if wrong
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
@ -172,17 +178,17 @@ class TtyCreateChildTest(testlib.TestCase):
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.receive_side, 4))
lambda proc: wait_read(proc.stdout, 4))
st = os.fstat(proc.receive_side.fd)
st = os.fstat(proc.stdout.fileno())
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.transmit_side.fd) # crashes if wrong
os.ttyname(proc.stdin.fileno()) # crashes if not TTY
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
@ -206,72 +212,73 @@ class TtyCreateChildTest(testlib.TestCase):
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.receive_side.fd)
self.assertEquals(mitogen.core.b('hi\n'), proc.receive_side.read())
self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3))
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read())
proc.receive_side.close()
proc.stdout.close()
finally:
tf.close()
class StderrDiagTtyMixin(object):
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.diag_receive_side, 4))
st = os.fstat(proc.diag_receive_side.fd)
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.diag_transmit_side.fd) # crashes if wrong
flags = fcntl.fcntl(proc.diag_receive_side.fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
class HybridTtyCreateChildTest(StdinSockMixin, StdoutSockMixin,
StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.hybrid_tty_create_child)
class SelinuxHybridTtyCreateChildTest(StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.selinux_hybrid_tty_create_child)
def test_stdin(self):
proc, info, buf = run_fd_check(self.func, 0, 'read',
lambda proc: proc.transmit_side.write('TEST'))
st = os.fstat(proc.transmit_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.transmit_side.fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
self.assertFalse(info['flags'] & os.O_WRONLY)
self.assertFalse(info['flags'] & os.O_RDWR)
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.receive_side, 4))
st = os.fstat(proc.receive_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
self.assertFalse(flags & os.O_WRONLY)
self.assertFalse(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
if 0:
class StderrDiagTtyMixin(object):
def test_stderr(self):
proc, info, buf = run_fd_check(self.func, 2, 'write',
lambda proc: wait_read(proc.diag_receive_side, 4))
st = os.fstat(proc.diag_receive_side.fd)
self.assertTrue(stat.S_ISCHR(st.st_mode))
self.assertTrue(stat.S_ISCHR(info['st_mode']))
self.assertTrue(isinstance(info['ttyname'],
mitogen.core.UnicodeType))
os.ttyname(proc.diag_transmit_side.fd) # crashes if wrong
flags = fcntl.fcntl(proc.diag_receive_side.fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_RDWR)
self.assertNotEquals(st.st_dev, info['st_dev'])
self.assertTrue(flags & os.O_RDWR)
self.assertTrue(buf, 'TEST')
class HybridTtyCreateChildTest(StdinSockMixin, StdoutSockMixin,
StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.hybrid_tty_create_child)
class SelinuxHybridTtyCreateChildTest(StderrDiagTtyMixin, testlib.TestCase):
func = staticmethod(mitogen.parent.selinux_hybrid_tty_create_child)
def test_stdin(self):
proc, info, buf = run_fd_check(self.func, 0, 'read',
lambda proc: proc.transmit_side.write('TEST'))
st = os.fstat(proc.transmit_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.transmit_side.fd, fcntl.F_GETFL)
self.assertTrue(flags & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
self.assertFalse(info['flags'] & os.O_WRONLY)
self.assertFalse(info['flags'] & os.O_RDWR)
def test_stdout(self):
proc, info, buf = run_fd_check(self.func, 1, 'write',
lambda proc: wait_read(proc.receive_side, 4))
st = os.fstat(proc.receive_side.fd)
self.assertTrue(stat.S_ISFIFO(st.st_mode))
self.assertEquals(st.st_dev, info['st_dev'])
self.assertEquals(st.st_mode, info['st_mode'])
flags = fcntl.fcntl(proc.receive_side.fd, fcntl.F_GETFL)
self.assertFalse(flags & os.O_WRONLY)
self.assertFalse(flags & os.O_RDWR)
self.assertTrue(info['flags'] & os.O_WRONLY)
self.assertTrue(buf, 'TEST')
if __name__ == '__main__':

@ -225,13 +225,13 @@ class TtyCreateChildTest(testlib.TestCase):
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.stdio_fp.fileno())
self.assertEquals(mitogen.core.b('hi\n'), proc.stdio_fp.read())
mitogen.core.set_block(proc.stdin.fileno())
self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read())
waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid)
self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read())
proc.stdio_fp.close()
proc.stdout.close()
finally:
tf.close()

Loading…
Cancel
Save