You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/tests/parent_test.py

298 lines
10 KiB
Python

import errno
import fcntl
import os
import signal
import subprocess
import sys
import tempfile
import time
import mock
import unittest2
import testlib
from testlib import Popen__terminate
import mitogen.core
import mitogen.parent
try:
file
except NameError:
from io import FileIO as file
def wait_for_child(pid, timeout=1.0):
deadline = mitogen.core.now() + timeout
while timeout < mitogen.core.now():
try:
target_pid, status = os.waitpid(pid, os.WNOHANG)
if target_pid == pid:
return
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD:
return
time.sleep(0.05)
assert False, "wait_for_child() timed out"
@mitogen.core.takes_econtext
def call_func_in_sibling(ctx, econtext, sync_sender):
recv = ctx.call_async(time.sleep, 99999)
sync_sender.send(None)
recv.get().unpickle()
def wait_for_empty_output_queue(sync_recv, context):
# wait for sender to submit their RPC. Since the RPC is sent first, the
# message sent to this sender cannot arrive until we've routed the RPC.
sync_recv.get()
router = context.router
broker = router.broker
while True:
# Now wait for the RPC to exit the output queue.
stream = router.stream_by_id(context.context_id)
if broker.defer_sync(lambda: stream.protocol.pending_bytes()) == 0:
return
time.sleep(0.1)
class GetDefaultRemoteNameTest(testlib.TestCase):
func = staticmethod(mitogen.parent.get_default_remote_name)
@mock.patch('os.getpid')
@mock.patch('getpass.getuser')
@mock.patch('socket.gethostname')
def test_slashes(self, mock_gethostname, mock_getuser, mock_getpid):
# Ensure slashes appearing in the remote name are replaced with
# underscores.
mock_gethostname.return_value = 'box'
mock_getuser.return_value = 'ECORP\\Administrator'
mock_getpid.return_value = 123
self.assertEquals("ECORP_Administrator@box:123", self.func())
class ReturncodeToStrTest(testlib.TestCase):
func = staticmethod(mitogen.parent.returncode_to_str)
def test_return_zero(self):
self.assertEquals(self.func(0), 'exited with return code 0')
def test_return_one(self):
self.assertEquals(self.func(1), 'exited with return code 1')
def test_sigkill(self):
self.assertEquals(self.func(-signal.SIGKILL),
'exited due to signal %s (SIGKILL)' % (int(signal.SIGKILL),)
)
# can't test SIGSTOP without POSIX sessions rabbithole
class ReapChildTest(testlib.RouterMixin, testlib.TestCase):
def test_connect_timeout(self):
# Ensure the child process is reaped if the connection times out.
options = mitogen.parent.Options(
old_router=self.router,
max_message_size=self.router.max_message_size,
python_path=testlib.data_path('python_never_responds.py'),
connect_timeout=0.5,
)
conn = mitogen.parent.Connection(options, router=self.router)
self.assertRaises(mitogen.core.TimeoutError,
lambda: conn.connect(context=mitogen.core.Context(None, 1234))
)
wait_for_child(conn.proc.pid)
e = self.assertRaises(OSError,
lambda: os.kill(conn.proc.pid, 0)
)
self.assertEquals(e.args[0], errno.ESRCH)
class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
def test_direct_eof(self):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local(
python_path='true',
connect_timeout=3,
)
)
prefix = mitogen.parent.Connection.eof_error_msg
self.assertTrue(e.args[0].startswith(prefix))
def test_via_eof(self):
# Verify FD leakage does not keep failed process open.
local = self.router.local()
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local(
via=local,
python_path='echo',
connect_timeout=3,
)
)
expect = mitogen.parent.Connection.eof_error_msg
self.assertTrue(expect in e.args[0])
def test_direct_enoent(self):
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local(
python_path='derp',
connect_timeout=3,
)
)
prefix = 'Child start failed: [Errno 2] No such file or directory'
self.assertTrue(e.args[0].startswith(prefix))
def test_via_enoent(self):
local = self.router.local()
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.router.local(
via=local,
python_path='derp',
connect_timeout=3,
)
)
s = 'Child start failed: [Errno 2] No such file or directory'
self.assertTrue(s in e.args[0])
class ContextTest(testlib.RouterMixin, testlib.TestCase):
def test_context_shutdown(self):
local = self.router.local()
pid = local.call(os.getpid)
local.shutdown(wait=True)
wait_for_child(pid)
self.assertRaises(OSError, lambda: os.kill(pid, 0))
class OpenPtyTest(testlib.TestCase):
func = staticmethod(mitogen.parent.openpty)
def test_pty_returned(self):
master_fp, slave_fp = self.func()
try:
self.assertTrue(master_fp.isatty())
self.assertTrue(isinstance(master_fp, file))
self.assertTrue(slave_fp.isatty())
self.assertTrue(isinstance(slave_fp, file))
finally:
master_fp.close()
slave_fp.close()
@mock.patch('os.openpty')
def test_max_reached(self, openpty):
openpty.side_effect = OSError(errno.ENXIO)
e = self.assertRaises(mitogen.core.StreamError,
lambda: self.func())
msg = mitogen.parent.OPENPTY_MSG % (openpty.side_effect,)
self.assertEquals(e.args[0], msg)
@unittest2.skipIf(condition=(os.uname()[0] != 'Linux'),
reason='Fallback only supported on Linux')
@mock.patch('os.openpty')
def test_broken_linux_fallback(self, openpty):
openpty.side_effect = OSError(errno.EPERM)
master_fp, slave_fp = self.func()
try:
st = os.fstat(master_fp.fileno())
self.assertEquals(5, os.major(st.st_rdev))
flags = fcntl.fcntl(master_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
st = os.fstat(slave_fp.fileno())
self.assertEquals(136, os.major(st.st_rdev))
flags = fcntl.fcntl(slave_fp.fileno(), fcntl.F_GETFL)
self.assertTrue(flags & os.O_RDWR)
finally:
master_fp.close()
slave_fp.close()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is
# disconnected.
c1 = self.router.local()
recv = c1.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_indirect_child_disconnected(self):
# Achievement unlocked: process notices an indirectly connected child
# is disconnected.
c1 = self.router.local()
c2 = self.router.local(via=c1)
recv = c2.call_async(time.sleep, 9999)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_indirect_child_intermediary_disconnected(self):
# Battlefield promotion: process notices indirect child disconnected
# due to an intermediary child disconnecting.
c1 = self.router.local()
c2 = self.router.local(via=c1)
recv = c2.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], self.router.respondent_disconnect_msg)
def test_near_sibling_disconnected(self):
# Hard mode: child notices sibling connected to same parent has
# disconnected.
c1 = self.router.local()
c2 = self.router.local()
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c1.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router)
recv = c1.call_async(call_func_in_sibling, c2,
sync_sender=sync_recv.to_sender())
wait_for_empty_output_queue(sync_recv, c2)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle())
s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
self.assertTrue(e.args[0].startswith(s), str(e))
def test_far_sibling_disconnected(self):
# God mode: child of child notices child of child of parent has
# disconnected.
c1 = self.router.local(name='c1')
c11 = self.router.local(name='c11', via=c1)
c2 = self.router.local(name='c2')
c22 = self.router.local(name='c22', via=c2)
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).protocol.auth_id = mitogen.context_id
c11.call(mitogen.parent.upgrade_router)
sync_recv = mitogen.core.Receiver(self.router)
recv = c11.call_async(call_func_in_sibling, c22,
sync_sender=sync_recv.to_sender())
wait_for_empty_output_queue(sync_recv, c22)
c22.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle())
s = 'mitogen.core.ChannelError: ' + self.router.respondent_disconnect_msg
self.assertTrue(e.args[0].startswith(s))
if __name__ == '__main__':
unittest2.main()