You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/tests/router_test.py

400 lines
13 KiB
Python

import time
import zlib
import unittest2
import testlib
import mitogen.master
import mitogen.parent
import mitogen.utils
try:
import Queue
except ImportError:
import queue as Queue
def ping():
return True
@mitogen.core.takes_router
def ping_context(other, router):
other = mitogen.parent.Context(router, other.context_id)
other.call(ping)
@mitogen.core.takes_router
def return_router_max_message_size(router):
return router.max_message_size
def send_n_sized_reply(sender, n):
sender.send(' ' * n)
return 123
class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase):
def setUp(self):
super(SourceVerifyTest, self).setUp()
# Create some children, ping them, and store what their messages look
# like so we can mess with them later.
self.child1 = self.router.local()
self.child1_msg = self.child1.call_async(ping).get()
self.child1_stream = self.router._stream_by_id[self.child1.context_id]
self.child2 = self.router.local()
self.child2_msg = self.child2.call_async(ping).get()
self.child2_stream = self.router._stream_by_id[self.child2.context_id]
def test_bad_auth_id(self):
# Deliver a message locally from child2, but using child1's stream.
log = testlib.LogCapturer()
log.start()
# Used to ensure the message was dropped rather than routed after the
# error is logged.
recv = mitogen.core.Receiver(self.router)
self.child2_msg.handle = recv.handle
self.broker.defer(self.router._async_route,
self.child2_msg,
in_stream=self.child1_stream)
# Wait for IO loop to finish everything above.
self.sync_with_broker()
# Ensure message wasn't forwarded.
self.assertTrue(recv.empty())
# Ensure error was logged.
expect = 'bad auth_id: got %r via' % (self.child2_msg.auth_id,)
self.assertTrue(expect in log.stop())
def test_bad_src_id(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.
log = testlib.LogCapturer()
log.start()
# Used to ensure the message was dropped rather than routed after the
# error is logged.
recv = mitogen.core.Receiver(self.router)
self.child2_msg.handle = recv.handle
self.child2_msg.src_id = self.child1.context_id
self.broker.defer(self.router._async_route,
self.child2_msg,
self.child2_stream)
# Wait for IO loop to finish everything above.
self.sync_with_broker()
# Ensure message wasn't forwarded.
self.assertTrue(recv.empty())
# Ensure error was lgoged.
expect = 'bad src_id: got %d via' % (self.child1_msg.src_id,)
self.assertTrue(expect in log.stop())
class PolicyTest(testlib.RouterMixin, testlib.TestCase):
def test_allow_any(self):
# This guy gets everything.
recv = mitogen.core.Receiver(self.router)
recv.to_sender().send(123)
self.sync_with_broker()
self.assertFalse(recv.empty())
self.assertEquals(123, recv.get().unpickle())
def test_refuse_all(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.
log = testlib.LogCapturer()
log.start()
# This guy never gets anything.
recv = mitogen.core.Receiver(
router=self.router,
policy=(lambda msg, stream: False),
)
# This guy becomes the reply_to of our refused message.
reply_target = mitogen.core.Receiver(self.router)
# Send the message.
self.router.route(
mitogen.core.Message(
dst_id=mitogen.context_id,
handle=recv.handle,
reply_to=reply_target.handle,
)
)
# Wait for IO loop.
self.sync_with_broker()
# Verify log.
self.assertTrue(self.router.refused_msg in log.stop())
# Verify message was not delivered.
self.assertTrue(recv.empty())
# Verify CallError received by reply_to target.
e = self.assertRaises(mitogen.core.ChannelError,
lambda: reply_target.get().unpickle())
self.assertEquals(e.args[0], self.router.refused_msg)
class CrashTest(testlib.BrokerMixin, testlib.TestCase):
# This is testing both Broker's ability to crash nicely, and Router's
# ability to respond to the crash event.
klass = mitogen.master.Router
def _naughty(self):
raise ValueError('eek')
def test_shutdown(self):
router = self.klass(self.broker)
sem = mitogen.core.Latch()
router.add_handler(sem.put)
log = testlib.LogCapturer('mitogen')
log.start()
# Force a crash and ensure it wakes up.
self.broker._loop_once = self._naughty
self.broker.defer(lambda: None)
# sem should have received dead message.
self.assertTrue(sem.get().is_dead)
# Ensure it was logged.
Refactor Stream, introduce quasi-asynchronous connect, much more Split Stream into many, many classes * mitogen.parent.Connection: Handles connection setup logic only. * Maintain references to stdout and stderr streams. * Manages TimerList timer to cancel connection attempt after deadline * Blocking setup code replaced by async equivalents running on the broker * mitogen.parent.Options: Tracks connection-specific options. This keeps the connection class small, but more importantly, it is generic to the future desire to build and execute command lines without starting a full connection. * mitogen.core.Protocol: Handles program behaviour relating to events on a stream. Protocol performs no IO of its own, instead deferring it to Stream and Side. This makes testing much easier, and means libssh can reimplement Stream and Side to reuse MitogenProtocol * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream implementtion * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered transmit implementation, made generic * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how to split up input and pass it on to a on_line_received()/on_partial_line_received() callback. * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old blocking connect code. Waits for various prompts (MITO001 etc) and writes the bootstrap using a BufferedWriter. On success, switches the stream to MitogenProtocol. * mitogen.core.Message: move encoding parts of MitogenProtocol out to Message (where it belongs) and write a bunch of new tests for pickling. * The bizarre Stream.construct() is gone now, Option.__init__ is its own constructor. Should fix many LGTM errors. * Update all connection methods: Every connection method is updated to use async logic, defining protocols as required to handle interactive prompts like in SSH or su. Add new real integration tests for at least doas and su. * Eliminate manual fd management: File descriptors are trapped in file objects at their point of origin, and Side is updated to use file objects rather than raw descriptors. This eliminates a whole class of bugs where unrelated FDs could be closed by the wrong component. Now an FD's open/closed status is fused to it everywhere in the library. * Halve file descriptor usage: now FD open/close state is tracked by its file object, we don't need to duplicate FDs everywhere so that receive/transmit side can be closed independently. Instead both sides back on to the same file object. Closes #26, Closes #470. * Remove most uses of dup/dup2: Closes #256. File descriptors are trapped in a common file object and shared among classes. The remaining few uses for dup/dup2 are as close to minimal as possible. * Introduce mitogen.parent.Process: uniform interface for subprocesses created either via mitogen.fork or the subprocess module. Remove all the crap where we steal a pid from subprocess guts. Now we use subprocess to manage its processes as it should be. Closes #169 by using the new Timers facility to poll for a slow-to-exit subprocess. * Fix su password race: Closes #363. DelineatedProtocol naturally retries partially received lines, preventing the cause of the original race. * Delete old blocking IO utility functions iter_read()/write_all()/discard_until(). Closes #26 Closes #147 Closes #169 Closes #256 Closes #363 Closes #419 Closes #470
6 years ago
expect = 'broker crashed'
self.assertTrue(expect in log.stop())
self.broker.join()
class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router
def test_dead_message_sent_at_shutdown(self):
router = self.klass()
queue = Queue.Queue()
handle = router.add_handler(queue.put)
router.broker.shutdown()
self.assertTrue(queue.get(timeout=5).is_dead)
router.broker.join()
def test_cannot_double_register(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
e = self.assertRaises(mitogen.core.Error,
lambda: router.add_handler((lambda: None), handle=1234))
self.assertEquals(router.duplicate_handle_msg, e.args[0])
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
def test_can_reregister(self):
router = self.klass()
try:
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
router.add_handler((lambda: None), handle=1234)
router.del_handler(1234)
finally:
router.broker.shutdown()
router.broker.join()
class MyselfTest(testlib.RouterMixin, testlib.TestCase):
def test_myself(self):
myself = self.router.myself()
self.assertEquals(myself.context_id, mitogen.context_id)
# TODO: context should know its own name too.
self.assertEquals(myself.name, 'self')
class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
klass = mitogen.master.Router
def test_local_exceeded(self):
router = self.klass(broker=self.broker, max_message_size=4096)
logs = testlib.LogCapturer()
logs.start()
# Send message and block for one IO loop, so _async_route can run.
router.route(mitogen.core.Message.pickled(' '*8192))
router.broker.defer_sync(lambda: None)
expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop())
def test_local_dead_message(self):
# Local router should generate dead message when reply_to is set.
router = self.klass(broker=self.broker, max_message_size=4096)
logs = testlib.LogCapturer()
logs.start()
expect = router.too_large_msg % (4096,)
# Try function call. Receiver should be woken by a dead message sent by
# router due to message size exceeded.
child = router.local()
e = self.assertRaises(mitogen.core.ChannelError,
lambda: child.call(zlib.crc32, ' '*8192))
self.assertEquals(e.args[0], expect)
self.assertTrue(expect in logs.stop())
def test_remote_configured(self):
router = self.klass(broker=self.broker, max_message_size=64*1024)
remote = router.local()
size = remote.call(return_router_max_message_size)
self.assertEquals(size, 64*1024)
def test_remote_exceeded(self):
# Ensure new contexts receive a router with the same value.
router = self.klass(broker=self.broker, max_message_size=64*1024)
recv = mitogen.core.Receiver(router)
logs = testlib.LogCapturer()
logs.start()
remote = router.local()
remote.call(send_n_sized_reply, recv.to_sender(), 128*1024)
expect = 'message too large (max %d bytes)' % (64*1024,)
self.assertTrue(expect in logs.stop())
class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
def test_invalid_handle_returns_dead(self):
# Verify sending a message to an invalid handle yields a dead message
# from the target context.
l1 = self.router.local()
recv = l1.send_async(mitogen.core.Message(handle=999))
msg = recv.get(throw_dead=False)
self.assertEquals(msg.is_dead, True)
self.assertEquals(msg.src_id, l1.context_id)
self.assertEquals(msg.data, self.router.invalid_handle_msg.encode())
recv = l1.send_async(mitogen.core.Message(handle=999))
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], self.router.invalid_handle_msg)
def test_totally_invalid_context_returns_dead(self):
recv = mitogen.core.Receiver(self.router)
msg = mitogen.core.Message(
dst_id=1234,
handle=1234,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.assertEquals(rmsg.data, (self.router.no_route_msg % (
1234,
mitogen.context_id,
)).encode())
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], (self.router.no_route_msg % (
1234,
mitogen.context_id,
)))
def test_previously_alive_context_returns_dead(self):
l1 = self.router.local()
l1.shutdown(wait=True)
recv = mitogen.core.Receiver(self.router)
msg = mitogen.core.Message(
dst_id=l1.context_id,
handle=mitogen.core.CALL_FUNCTION,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.assertEquals(rmsg.data, (self.router.no_route_msg % (
l1.context_id,
mitogen.context_id,
)).encode())
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], self.router.no_route_msg % (
l1.context_id,
mitogen.context_id,
))
class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
def test_siblings_cant_talk(self):
self.router.unidirectional = True
l1 = self.router.local()
l2 = self.router.local()
logs = testlib.LogCapturer()
logs.start()
e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1))
msg = self.router.unidirectional_msg % (
l2.context_id,
l1.context_id,
)
self.assertTrue(msg in str(e))
self.assertTrue('routing mode prevents forward of ' in logs.stop())
def test_auth_id_can_talk(self):
self.router.unidirectional = True
# One stream has auth_id stamped to that of the master, so it should be
# treated like a parent.
l1 = self.router.local()
l1s = self.router.stream_by_id(l1.context_id)
Refactor Stream, introduce quasi-asynchronous connect, much more Split Stream into many, many classes * mitogen.parent.Connection: Handles connection setup logic only. * Maintain references to stdout and stderr streams. * Manages TimerList timer to cancel connection attempt after deadline * Blocking setup code replaced by async equivalents running on the broker * mitogen.parent.Options: Tracks connection-specific options. This keeps the connection class small, but more importantly, it is generic to the future desire to build and execute command lines without starting a full connection. * mitogen.core.Protocol: Handles program behaviour relating to events on a stream. Protocol performs no IO of its own, instead deferring it to Stream and Side. This makes testing much easier, and means libssh can reimplement Stream and Side to reuse MitogenProtocol * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream implementtion * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered transmit implementation, made generic * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how to split up input and pass it on to a on_line_received()/on_partial_line_received() callback. * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old blocking connect code. Waits for various prompts (MITO001 etc) and writes the bootstrap using a BufferedWriter. On success, switches the stream to MitogenProtocol. * mitogen.core.Message: move encoding parts of MitogenProtocol out to Message (where it belongs) and write a bunch of new tests for pickling. * The bizarre Stream.construct() is gone now, Option.__init__ is its own constructor. Should fix many LGTM errors. * Update all connection methods: Every connection method is updated to use async logic, defining protocols as required to handle interactive prompts like in SSH or su. Add new real integration tests for at least doas and su. * Eliminate manual fd management: File descriptors are trapped in file objects at their point of origin, and Side is updated to use file objects rather than raw descriptors. This eliminates a whole class of bugs where unrelated FDs could be closed by the wrong component. Now an FD's open/closed status is fused to it everywhere in the library. * Halve file descriptor usage: now FD open/close state is tracked by its file object, we don't need to duplicate FDs everywhere so that receive/transmit side can be closed independently. Instead both sides back on to the same file object. Closes #26, Closes #470. * Remove most uses of dup/dup2: Closes #256. File descriptors are trapped in a common file object and shared among classes. The remaining few uses for dup/dup2 are as close to minimal as possible. * Introduce mitogen.parent.Process: uniform interface for subprocesses created either via mitogen.fork or the subprocess module. Remove all the crap where we steal a pid from subprocess guts. Now we use subprocess to manage its processes as it should be. Closes #169 by using the new Timers facility to poll for a slow-to-exit subprocess. * Fix su password race: Closes #363. DelineatedProtocol naturally retries partially received lines, preventing the cause of the original race. * Delete old blocking IO utility functions iter_read()/write_all()/discard_until(). Closes #26 Closes #147 Closes #169 Closes #256 Closes #363 Closes #419 Closes #470
6 years ago
l1s.protocol.auth_id = mitogen.context_id
l1s.protocol.is_privileged = True
l2 = self.router.local()
e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1))
msg = 'mitogen.core.ChannelError: %s' % (self.router.refused_msg,)
self.assertTrue(str(e).startswith(msg))
class EgressIdsTest(testlib.RouterMixin, testlib.TestCase):
def test_egress_ids_populated(self):
# Ensure Stream.egress_ids is populated on message reception.
Refactor Stream, introduce quasi-asynchronous connect, much more Split Stream into many, many classes * mitogen.parent.Connection: Handles connection setup logic only. * Maintain references to stdout and stderr streams. * Manages TimerList timer to cancel connection attempt after deadline * Blocking setup code replaced by async equivalents running on the broker * mitogen.parent.Options: Tracks connection-specific options. This keeps the connection class small, but more importantly, it is generic to the future desire to build and execute command lines without starting a full connection. * mitogen.core.Protocol: Handles program behaviour relating to events on a stream. Protocol performs no IO of its own, instead deferring it to Stream and Side. This makes testing much easier, and means libssh can reimplement Stream and Side to reuse MitogenProtocol * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream implementtion * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered transmit implementation, made generic * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how to split up input and pass it on to a on_line_received()/on_partial_line_received() callback. * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old blocking connect code. Waits for various prompts (MITO001 etc) and writes the bootstrap using a BufferedWriter. On success, switches the stream to MitogenProtocol. * mitogen.core.Message: move encoding parts of MitogenProtocol out to Message (where it belongs) and write a bunch of new tests for pickling. * The bizarre Stream.construct() is gone now, Option.__init__ is its own constructor. Should fix many LGTM errors. * Update all connection methods: Every connection method is updated to use async logic, defining protocols as required to handle interactive prompts like in SSH or su. Add new real integration tests for at least doas and su. * Eliminate manual fd management: File descriptors are trapped in file objects at their point of origin, and Side is updated to use file objects rather than raw descriptors. This eliminates a whole class of bugs where unrelated FDs could be closed by the wrong component. Now an FD's open/closed status is fused to it everywhere in the library. * Halve file descriptor usage: now FD open/close state is tracked by its file object, we don't need to duplicate FDs everywhere so that receive/transmit side can be closed independently. Instead both sides back on to the same file object. Closes #26, Closes #470. * Remove most uses of dup/dup2: Closes #256. File descriptors are trapped in a common file object and shared among classes. The remaining few uses for dup/dup2 are as close to minimal as possible. * Introduce mitogen.parent.Process: uniform interface for subprocesses created either via mitogen.fork or the subprocess module. Remove all the crap where we steal a pid from subprocess guts. Now we use subprocess to manage its processes as it should be. Closes #169 by using the new Timers facility to poll for a slow-to-exit subprocess. * Fix su password race: Closes #363. DelineatedProtocol naturally retries partially received lines, preventing the cause of the original race. * Delete old blocking IO utility functions iter_read()/write_all()/discard_until(). Closes #26 Closes #147 Closes #169 Closes #256 Closes #363 Closes #419 Closes #470
6 years ago
c1 = self.router.local(name='c1')
c2 = self.router.local(name='c2')
Refactor Stream, introduce quasi-asynchronous connect, much more Split Stream into many, many classes * mitogen.parent.Connection: Handles connection setup logic only. * Maintain references to stdout and stderr streams. * Manages TimerList timer to cancel connection attempt after deadline * Blocking setup code replaced by async equivalents running on the broker * mitogen.parent.Options: Tracks connection-specific options. This keeps the connection class small, but more importantly, it is generic to the future desire to build and execute command lines without starting a full connection. * mitogen.core.Protocol: Handles program behaviour relating to events on a stream. Protocol performs no IO of its own, instead deferring it to Stream and Side. This makes testing much easier, and means libssh can reimplement Stream and Side to reuse MitogenProtocol * mitogen.core.MitogenProtocol: Guts of the old Mitogen stream implementtion * mitogen.core.BufferedWriter: Guts of the old Mitogen buffered transmit implementation, made generic * mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how to split up input and pass it on to a on_line_received()/on_partial_line_received() callback. * mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old blocking connect code. Waits for various prompts (MITO001 etc) and writes the bootstrap using a BufferedWriter. On success, switches the stream to MitogenProtocol. * mitogen.core.Message: move encoding parts of MitogenProtocol out to Message (where it belongs) and write a bunch of new tests for pickling. * The bizarre Stream.construct() is gone now, Option.__init__ is its own constructor. Should fix many LGTM errors. * Update all connection methods: Every connection method is updated to use async logic, defining protocols as required to handle interactive prompts like in SSH or su. Add new real integration tests for at least doas and su. * Eliminate manual fd management: File descriptors are trapped in file objects at their point of origin, and Side is updated to use file objects rather than raw descriptors. This eliminates a whole class of bugs where unrelated FDs could be closed by the wrong component. Now an FD's open/closed status is fused to it everywhere in the library. * Halve file descriptor usage: now FD open/close state is tracked by its file object, we don't need to duplicate FDs everywhere so that receive/transmit side can be closed independently. Instead both sides back on to the same file object. Closes #26, Closes #470. * Remove most uses of dup/dup2: Closes #256. File descriptors are trapped in a common file object and shared among classes. The remaining few uses for dup/dup2 are as close to minimal as possible. * Introduce mitogen.parent.Process: uniform interface for subprocesses created either via mitogen.fork or the subprocess module. Remove all the crap where we steal a pid from subprocess guts. Now we use subprocess to manage its processes as it should be. Closes #169 by using the new Timers facility to poll for a slow-to-exit subprocess. * Fix su password race: Closes #363. DelineatedProtocol naturally retries partially received lines, preventing the cause of the original race. * Delete old blocking IO utility functions iter_read()/write_all()/discard_until(). Closes #26 Closes #147 Closes #169 Closes #256 Closes #363 Closes #419 Closes #470
6 years ago
c1s = self.router.stream_by_id(c1.context_id)
try:
c1.call(ping_context, c2)
except mitogen.core.CallError:
# Fails because siblings cant call funcs in each other, but this
# causes messages to be sent.
pass
self.assertEquals(c1s.protocol.egress_ids, set([
mitogen.context_id,
c2.context_id,
]))
if __name__ == '__main__':
unittest2.main()