|
|
@ -101,6 +101,7 @@ class StdinSockMixin(object):
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(info['buf'], 'TEST')
|
|
|
|
self.assertTrue(info['buf'], 'TEST')
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StdoutSockMixin(object):
|
|
|
|
class StdoutSockMixin(object):
|
|
|
@ -115,6 +116,7 @@ class StdoutSockMixin(object):
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CreateChildTest(StdinSockMixin, StdoutSockMixin, testlib.TestCase):
|
|
|
|
class CreateChildTest(StdinSockMixin, StdoutSockMixin, testlib.TestCase):
|
|
|
@ -126,6 +128,7 @@ class CreateChildTest(StdinSockMixin, StdoutSockMixin, testlib.TestCase):
|
|
|
|
self.assertEqual(st.st_dev, info['st_dev'])
|
|
|
|
self.assertEqual(st.st_dev, info['st_dev'])
|
|
|
|
self.assertEqual(st.st_mode, info['st_mode'])
|
|
|
|
self.assertEqual(st.st_mode, info['st_mode'])
|
|
|
|
self.assertEqual(st.st_ino, info['st_ino'])
|
|
|
|
self.assertEqual(st.st_ino, info['st_ino'])
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
|
|
|
|
class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
|
|
|
@ -144,6 +147,7 @@ class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin,
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
|
|
|
|
class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
|
|
|
@ -164,6 +168,7 @@ class CreateChildStderrPipeTest(StdinSockMixin, StdoutSockMixin,
|
|
|
|
self.assertFalse(flags & os.O_RDWR)
|
|
|
|
self.assertFalse(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(info['flags'] & os.O_WRONLY)
|
|
|
|
self.assertTrue(info['flags'] & os.O_WRONLY)
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
class TtyCreateChildTest(testlib.TestCase):
|
|
|
@ -191,9 +196,9 @@ class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
self.assertEqual(proc.pid, waited_pid)
|
|
|
|
self.assertEqual(proc.pid, waited_pid)
|
|
|
|
self.assertEqual(0, status)
|
|
|
|
self.assertEqual(0, status)
|
|
|
|
self.assertEqual(mitogen.core.b(''), tf.read())
|
|
|
|
self.assertEqual(mitogen.core.b(''), tf.read())
|
|
|
|
proc.stdout.close()
|
|
|
|
|
|
|
|
finally:
|
|
|
|
finally:
|
|
|
|
tf.close()
|
|
|
|
tf.close()
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
def test_stdin(self):
|
|
|
|
def test_stdin(self):
|
|
|
|
proc, info, _ = run_fd_check(self.func, 0, 'read',
|
|
|
|
proc, info, _ = run_fd_check(self.func, 0, 'read',
|
|
|
@ -210,6 +215,7 @@ class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertTrue(info['buf'], 'TEST')
|
|
|
|
self.assertTrue(info['buf'], 'TEST')
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
def test_stdout(self):
|
|
|
|
def test_stdout(self):
|
|
|
|
proc, info, buf = run_fd_check(self.func, 1, 'write',
|
|
|
|
proc, info, buf = run_fd_check(self.func, 1, 'write',
|
|
|
@ -229,6 +235,7 @@ class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
def test_stderr(self):
|
|
|
|
def test_stderr(self):
|
|
|
|
# proc.stderr is None in the parent since there is no separate stderr
|
|
|
|
# proc.stderr is None in the parent since there is no separate stderr
|
|
|
@ -250,6 +257,7 @@ class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
def test_dev_tty_open_succeeds(self):
|
|
|
|
def test_dev_tty_open_succeeds(self):
|
|
|
|
# In the early days of UNIX, a process that lacked a controlling TTY
|
|
|
|
# In the early days of UNIX, a process that lacked a controlling TTY
|
|
|
@ -274,6 +282,7 @@ class TtyCreateChildTest(testlib.TestCase):
|
|
|
|
proc.stdout.close()
|
|
|
|
proc.stdout.close()
|
|
|
|
finally:
|
|
|
|
finally:
|
|
|
|
tf.close()
|
|
|
|
tf.close()
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StderrDiagTtyMixin(object):
|
|
|
|
class StderrDiagTtyMixin(object):
|
|
|
@ -296,6 +305,7 @@ class StderrDiagTtyMixin(object):
|
|
|
|
|
|
|
|
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HybridTtyCreateChildTest(StdinSockMixin, StdoutSockMixin,
|
|
|
|
class HybridTtyCreateChildTest(StdinSockMixin, StdoutSockMixin,
|
|
|
@ -321,6 +331,7 @@ if 0:
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertFalse(info['flags'] & os.O_WRONLY)
|
|
|
|
self.assertFalse(info['flags'] & os.O_WRONLY)
|
|
|
|
self.assertFalse(info['flags'] & os.O_RDWR)
|
|
|
|
self.assertFalse(info['flags'] & os.O_RDWR)
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|
|
|
|
|
|
|
|
|
def test_stdout(self):
|
|
|
|
def test_stdout(self):
|
|
|
|
proc, info, buf = run_fd_check(self.func, 1, 'write',
|
|
|
|
proc, info, buf = run_fd_check(self.func, 1, 'write',
|
|
|
@ -334,3 +345,4 @@ if 0:
|
|
|
|
self.assertFalse(flags & os.O_RDWR)
|
|
|
|
self.assertFalse(flags & os.O_RDWR)
|
|
|
|
self.assertTrue(info['flags'] & os.O_WRONLY)
|
|
|
|
self.assertTrue(info['flags'] & os.O_WRONLY)
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
self.assertTrue(buf, 'TEST')
|
|
|
|
|
|
|
|
close_proc(proc)
|
|
|
|