service: v2. Closes #213

pull/255/head
David Wilson 6 years ago
parent 469bde63c2
commit 3b0addcfb0

@ -458,13 +458,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
)
)
dct = mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.ContextService.handle,
method='get',
kwargs=mitogen.utils.cast({
'stack': stack,
})
dct = self.parent.call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='get',
stack=mitogen.utils.cast(list(stack)),
)
if dct['msg']:
@ -490,13 +487,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
multiple times.
"""
if self.context:
mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.ContextService.handle,
method='put',
kwargs={
'context': self.context
}
self.parent.call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='put',
context=self.context
)
self.context = None
@ -618,13 +612,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return self.put_data(out_path, s, mode=st.st_mode,
utimes=(st.st_atime, st.st_mtime))
mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.FileService.handle,
method='register',
kwargs={
'path': mitogen.utils.cast(in_path)
}
self.parent.call_service(
service_name='ansible_mitogen.services.FileService',
method_name='register',
path=mitogen.utils.cast(in_path)
)
self.call(
ansible_mitogen.target.transfer_file,

@ -182,13 +182,10 @@ class BinaryPlanner(Planner):
def _grant_file_service_access(self, invocation):
invocation.connection._connect()
mitogen.service.call(
context=invocation.connection.parent,
handle=ansible_mitogen.services.FileService.handle,
method='register',
kwargs={
'path': invocation.module_path
}
invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.FileService',
method_name='register',
path=invocation.module_path,
)
def plan(self, invocation, **kwargs):
@ -301,16 +298,14 @@ class NewStylePlanner(ScriptPlanner):
def get_module_utils(self, invocation):
invocation.connection._connect()
return mitogen.service.call(
context=invocation.connection.parent,
handle=ansible_mitogen.services.ModuleDepService.handle,
method='scan',
kwargs={
'module_name': 'ansible_module_%s' % (invocation.module_name,),
'module_path': invocation.module_path,
'search_path': self.get_search_path(invocation),
'builtin_path': module_common._MODULE_UTILS_PATH,
}
return invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
module_name='ansible_module_%s' % (invocation.module_name,),
module_path=invocation.module_path,
search_path=self.get_search_path(invocation),
builtin_path=module_common._MODULE_UTILS_PATH,
)
def plan(self, invocation):

@ -76,8 +76,6 @@ class ContextService(mitogen.service.Service):
processes and arranging for the worker to select one according to a hash of
the connection parameters (sharding).
"""
handle = 500
max_message_size = 1000
max_interpreters = int(os.getenv('MITOGEN_MAX_INTERPRETERS', '20'))
def __init__(self, *args, **kwargs):
@ -420,8 +418,6 @@ class FileService(mitogen.service.Service):
proceed normally, without the associated thread needing to be
forcefully killed.
"""
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
@ -609,9 +605,6 @@ class ModuleDepService(mitogen.service.Service):
Scan a new-style module and produce a cached mapping of module_utils names
to their resolved filesystem paths.
"""
max_message_size = 1000
handle = 502
def __init__(self, file_service, **kwargs):
super(ModuleDepService, self).__init__(**kwargs)
self._file_service = file_service

@ -90,26 +90,20 @@ def _get_file(context, path, out_fp):
LOG.debug('_get_file(): fetching %r from %r', path, context)
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = mitogen.service.call(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path,
'sender': recv.to_sender()
}
metadata = context.call_service(
service_name='ansible_mitogen.services.FileService',
method_name='fetch',
path=path,
sender=recv.to_sender(),
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
mitogen.service.call_async(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='acknowledge',
kwargs={
'size': len(s),
}
context.call_service_async(
service_name='ansible_mitogen.services.FileService',
method_name='acknowledge',
size=len(s),
).close()
out_fp.write(s)

@ -6,14 +6,48 @@ Service Framework
=================
Mitogen includes a simple framework for implementing services exposed to other
contexts, with built-in subclasses that capture some common service models.
This is a work in progress, and new functionality will be added as common usage
patterns emerge.
contexts, with some built-in subclasses to capture common designs. This is a
work in progress, and new functionality will be added as common usage patterns
emerge.
Overview
--------
Service
* User-supplied class with explicitly exposed methods.
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass).
* May be auto-imported/constructed in a child from a parent simply by calling it
* Children receive refusals if the class is not already activated by a aprent
* Has an associated Select instance which may be dynamically loaded with
receivers over time, on_message_received() invoked if any receiver becomes
ready.
Invoker
* Abstracts mechanism for calling a service method and verifying permissions.
* Built-in 'service.Invoker': concurrent execution of all methods on the thread pool.
* Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct
(method, kwargs) key, only one such method executes, return value is cached
and broadcast to all requesters.
Activator
* Abstracts mechanism for activating a service and verifying activation
permission.
* Built-in activator looks for service by fully.qualified.ClassName using
Python import mechanism, and only permits parents to trigger activation.
Pool
* Manages a fixed-size thread pool, mapping of service name to Invoker, and an
aggregate Select over every activate service's Selects.
* Constructed automatically in children in response to the first
CALL_SERVICE message sent to them by a parent.
* Must be constructed manually in parent context.
* Has close() and add() methods.
Example
-------

@ -11,9 +11,6 @@ import mitogen.unix
class PingService(mitogen.service.Service):
well_known_id = 500
max_message_size = 1000
def dispatch(self, dct, msg):
return 'Hello, world'

@ -75,6 +75,10 @@ IOLOG.setLevel(logging.INFO)
_v = False
_vv = False
# Also taken by Broker, no blocking work can occur with it held.
_service_call_lock = threading.Lock()
_service_calls = []
GET_MODULE = 100
CALL_FUNCTION = 101
FORWARD_LOG = 102
@ -85,6 +89,7 @@ SHUTDOWN = 106
LOAD_MODULE = 107
FORWARD_MODULE = 108
DETACHING = 109
CALL_SERVICE = 110
IS_DEAD = 999
try:
@ -1000,12 +1005,6 @@ class Context(object):
_v and LOG.debug('%r.on_disconnect()', self)
fire(self, 'disconnect')
def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May
be called from any thread."""
msg.dst_id = self.context_id
self.router.route(msg)
def send_async(self, msg, persist=False):
if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread')
@ -1018,6 +1017,25 @@ class Context(object):
self.send(msg)
return receiver
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
if not isinstance(service_name, basestring):
service_name = service_name.name() # Service.name()
tup = (service_name, method_name, kwargs)
msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg)
def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May
be called from any thread."""
msg.dst_id = self.context_id
self.router.route(msg)
def call_service(self, service_name, method_name, **kwargs):
recv = self.call_service_async(service_name, method_name, **kwargs)
return recv.get().unpickle()
def send_await(self, msg, deadline=None):
"""Send `msg` and wait for a response with an optional timeout."""
receiver = self.send_async(msg)
@ -1626,6 +1644,28 @@ class ExternalContext(object):
if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM)
def _on_call_service_msg(self, msg):
"""
Stub CALL_SERVICE handler, push message on temporary queue and invoke
_on_stub_call() from the main thread.
"""
if msg.is_dead:
return
_service_call_lock.acquire()
try:
_service_calls.append(msg)
finally:
_service_call_lock.release()
self.router.route(
Message.pickled(
dst_id=mitogen.context_id,
handle=CALL_FUNCTION,
obj=('mitogen.service', None, '_on_stub_call', (), {}),
router=self.router,
)
)
def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg)
if not msg.is_dead:
@ -1673,6 +1713,11 @@ class ExternalContext(object):
handle=SHUTDOWN,
policy=has_parent_authority,
)
self.router.add_handler(
fn=self._on_call_service_msg,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self.master = Context(self.router, 0, 'master')
parent_id = self.config['parent_ids'][0]
if parent_id == 0:

@ -35,23 +35,36 @@ import mitogen.select
from mitogen.core import LOG
class Policy(object):
"""
Base security policy.
"""
def is_authorized(self, service, msg):
raise NotImplementedError()
DEFAULT_POOL_SIZE = 16
_pool = None
class AllowAny(Policy):
def is_authorized(self, service, msg):
return True
@mitogen.core.takes_router
def get_or_create_pool(size=None, router=None):
global _pool
if _pool is None:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE)
return _pool
class AllowParents(Policy):
def is_authorized(self, service, msg):
return (msg.auth_id in mitogen.parent_ids or
msg.auth_id == mitogen.context_id)
@mitogen.core.takes_router
def _on_stub_call(router):
"""
Called for each message received by the core.py stub CALL_SERVICE handler.
Create the pool if it doesn't already exist, and push enqueued messages
into the pool's receiver. This may be called more than once as the stub
service handler runs in asynchronous context, while _on_stub_call() happens
on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool
has a chance to install the real CALL_SERVICE handler.
"""
pool = get_or_create_pool(router=router)
mitogen.core._service_call_lock.acquire()
try:
for msg in mitogen.core._service_calls:
pool._receiver._on_receive(msg)
del mitogen.core._service_calls[:]
finally:
mitogen.core._service_call_lock.release()
def validate_arg_spec(spec, args):
@ -132,63 +145,86 @@ def no_reply():
return wrapper
class Service(object):
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
NO_REPLY = object()
class Error(Exception):
"""
Raised when an error occurs configuring a service or pool.
"""
#: If ``None``, a handle is dynamically allocated, otherwise the fixed
#: integer handle to use.
handle = None
max_message_size = 0
def __init__(self, router):
self.router = router
self.recv = mitogen.core.Receiver(router, self.handle)
self.recv.service = self
self.handle = self.recv.handle
self.running = True
class Policy(object):
"""
Base security policy.
"""
def is_authorized(self, service, msg):
raise NotImplementedError()
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_shutdown(self):
"""
Called by Pool.shutdown() once the last worker thread has exitted.
"""
class AllowAny(Policy):
def is_authorized(self, service, msg):
return True
def dispatch(self, args, msg):
raise NotImplementedError()
def _validate_message(self, msg):
if len(msg.data) > self.max_message_size:
raise mitogen.core.CallError('Message size exceeded.')
class AllowParents(Policy):
def is_authorized(self, service, msg):
return (msg.auth_id in mitogen.parent_ids or
msg.auth_id == mitogen.context_id)
pair = msg.unpickle(throw=False)
if not (isinstance(pair, tuple) and
len(pair) == 2 and
isinstance(pair[0], basestring)):
raise mitogen.core.CallError('Invalid message format.')
method_name, kwargs = pair
method = getattr(self, method_name, None)
class Activator(object):
"""
"""
def is_permitted(self, mod_name, class_name, msg):
return mitogen.core.has_parent_authority(msg)
not_active_msg = (
'Service %r is not yet activated in this context, and the '
'caller is not privileged, therefore autoactivation is disabled.'
)
def activate(self, pool, service_name, msg):
mod_name, _, class_name = service_name.rpartition('.')
if not self.is_permitted(mod_name, class_name, msg):
raise mitogen.core.CallError(self.not_active_msg, service_name)
module = mitogen.core.import_module(mod_name)
klass = getattr(module, class_name)
service = klass(pool.router)
pool.add(service)
return service
class Invoker(object):
def __init__(self, service):
self.service = service
def __repr__(self):
return '%s(%s)' % (type(self).__name__, self.service)
unauthorized_msg = (
'Caller is not authorized to invoke %r of service %r'
)
def _validate(self, method_name, kwargs, msg):
method = getattr(self.service, method_name, None)
if method is None:
raise mitogen.core.CallError('No such method exists.')
raise mitogen.core.CallError('No such method: %r', method_name)
policies = getattr(method, 'mitogen_service__policies', None)
if not policies:
raise mitogen.core.CallError('Method has no policies set.')
if not all(p.is_authorized(self, msg) for p in policies):
raise mitogen.core.CallError('Unauthorized')
if not all(p.is_authorized(self.service, msg) for p in policies):
raise mitogen.core.CallError(
self.unauthorized_msg,
method_name,
self.service.name()
)
required = getattr(method, 'mitogen_service__arg_spec', {})
validate_arg_spec(required, kwargs)
return method_name, kwargs
def _on_receive_message(self, msg):
method_name, kwargs = self._validate_message(msg)
method = getattr(self, method_name)
def _invoke(self, method_name, kwargs, msg):
method = getattr(self.service, method_name)
if 'msg' in method.func_code.co_varnames:
kwargs['msg'] = msg # TODO: hack
@ -197,7 +233,7 @@ class Service(object):
try:
ret = method(**kwargs)
if no_reply:
return self.NO_REPLY
return Service.NO_REPLY
return ret
except Exception:
if no_reply:
@ -206,22 +242,14 @@ class Service(object):
else:
raise
def on_receive_message(self, msg):
try:
response = self._on_receive_message(msg)
if response is not self.NO_REPLY:
msg.reply(response)
except mitogen.core.CallError:
e = sys.exc_info()[1]
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('While invoking %r.dispatch()', self)
e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e))
def invoke(self, method_name, kwargs, msg):
self._validate(method_name, kwargs, msg)
response = self._invoke(method_name, kwargs, msg)
if response is not Service.NO_REPLY:
msg.reply(response)
class DeduplicatingService(Service):
class DeduplicatingInvoker(Invoker):
"""
A service that deduplicates and caches expensive responses. Requests are
deduplicated according to a customizable key, and the single expensive
@ -233,8 +261,8 @@ class DeduplicatingService(Service):
Only one pool thread is blocked during generation of the response,
regardless of the number of requestors.
"""
def __init__(self, router):
super(DeduplicatingService, self).__init__(router)
def __init__(self, service):
super(DeduplicatingInvoker, self).__init__(service)
self._responses = {}
self._waiters = {}
self._lock = threading.Lock()
@ -261,10 +289,8 @@ class DeduplicatingService(Service):
finally:
self._lock.release()
def _on_receive_message(self, msg):
method_name, kwargs = self._validate_message(msg)
def _invoke(self, method_name, kwargs, msg):
key = self.key_from_request(method_name, kwargs)
self._lock.acquire()
try:
if key in self._responses:
@ -272,7 +298,7 @@ class DeduplicatingService(Service):
if key in self._waiters:
self._waiters[key].append(msg)
return self.NO_REPLY
return Service.NO_REPLY
self._waiters[key] = [msg]
finally:
@ -289,7 +315,37 @@ class DeduplicatingService(Service):
e = sys.exc_info()[1]
self._produce_response(key, mitogen.core.CallError(e))
return self.NO_REPLY
return Service.NO_REPLY
class Service(object):
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
NO_REPLY = object()
invoker_class = Invoker
@classmethod
def name(cls):
return '%s.%s' % (cls.__module__, cls.__name__)
def __init__(self, router):
self.router = router
self.select = mitogen.select.Select()
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_message(self, recv, msg):
"""
Called when a message arrives on any of :attr:`select`'s registered
receivers.
"""
def on_shutdown(self):
"""
Called by Pool.shutdown() once the last worker thread has exitted.
"""
class Pool(object):
@ -299,7 +355,7 @@ class Pool(object):
Internally this is implemented by subscribing every :py:class:`Service`'s
:py:class:`mitogen.core.Receiver` using a single
:py:class:`mitogen.master.Select`, then arranging for every thread to
:py:class:`mitogen.select.Select`, then arranging for every thread to
consume messages delivered to that select.
In this way the threads are fairly shared by all available services, and no
@ -308,21 +364,33 @@ class Pool(object):
There is no penalty for exposing large numbers of services; the list of
exposed services could even be generated dynamically in response to your
program's configuration or its input data.
:param mitogen.core.Router router:
Router to listen for ``CALL_SERVICE`` messages on.
:param list services:
Initial list of services to register.
"""
activator_class = Activator
def __init__(self, router, services, size=1):
assert size > 0
self.router = router
self.services = list(services)
self.size = size
self._select = mitogen.select.Select(
receivers=[
service.recv
for service in self.services
],
oneshot=False,
self._activator = self.activator_class()
self._receiver = mitogen.core.Receiver(
router=router,
handle=mitogen.core.CALL_SERVICE,
)
self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver)
#: Serialize service construction.
self._lock = threading.Lock()
self._func_by_recv = {self._receiver: self._on_service_call}
self._invoker_by_name = {}
for service in services:
self.add(service)
self._threads = []
for x in xrange(size):
for x in range(size):
thread = threading.Thread(
name='mitogen.service.Pool.%x.worker-%d' % (id(self), x,),
target=self._worker_main,
@ -330,6 +398,19 @@ class Pool(object):
thread.start()
self._threads.append(thread)
@property
def size(self):
return len(self._threads)
def add(self, service):
name = service.name()
if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv
invoker = service.invoker_class(service)
self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message
closed = False
def stop(self):
@ -337,8 +418,45 @@ class Pool(object):
self._select.close()
for th in self._threads:
th.join()
for service in self.services:
service.on_shutdown()
for invoker in self._invoker_by_name.itervalues():
invoker.service.on_shutdown()
def get_invoker(self, name, msg):
self._lock.acquire()
try:
invoker = self._invoker_by_name.get(name)
if not invoker:
service = self._activator.activate(self, name, msg)
invoker = service.invoker_class(service)
self._invoker_by_name[name] = invoker
finally:
self._lock.release()
return invoker
def _validate(self, msg):
tup = msg.unpickle(throw=False)
if not (isinstance(tup, tuple) and
len(tup) == 3 and
isinstance(tup[0], basestring) and
isinstance(tup[1], basestring) and
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
self._validate(msg)
service_name, method_name, kwargs = msg.unpickle()
try:
invoker = self.get_invoker(service_name, msg)
return invoker.invoke(method_name, kwargs, msg)
except mitogen.core.CallError:
e = sys.exc_info()[1]
LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e)
except Exception:
LOG.exception('While invoking %r._invoke()', self)
e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e))
def _worker_run(self):
while not self.closed:
@ -349,11 +467,11 @@ class Pool(object):
LOG.info('%r: channel or latch closed, exitting: %s', self, e)
return
service = msg.receiver.service
func = self._func_by_recv[msg.receiver]
try:
service.on_receive_message(msg)
func(msg.receiver, msg)
except Exception:
LOG.exception('While handling %r using %r', msg, service)
LOG.exception('While handling %r using %r', msg, func)
def _worker_main(self):
try:
@ -367,19 +485,6 @@ class Pool(object):
th = threading.currentThread()
return 'mitogen.service.Pool(%#x, size=%d, th=%r)' % (
id(self),
self.size,
len(self._threads),
th.name,
)
def call_async(context, handle, method, kwargs=None):
LOG.debug('service.call_async(%r, %r, %r, %r)',
context, handle, method, kwargs)
pair = (method, kwargs or {})
msg = mitogen.core.Message.pickled(pair, handle=handle)
return context.send_async(msg)
def call(context, handle, method, kwargs):
recv = call_async(context, handle, method, kwargs)
return recv.get().unpickle()

@ -25,10 +25,8 @@ class ActionModule(ActionBase):
self._connection._connect()
return {
'changed': True,
'result': mitogen.service.call(
context=self._connection.parent,
handle=ansible_mitogen.services.ContextService.handle,
method='shutdown_all',
kwargs={}
'result': self._connection.parent.call_service(
service_name='ansible_mitogen.services.ContextService',
method_name='shutdown_all',
)
}

@ -0,0 +1,93 @@
import unittest2
import mitogen.core
import mitogen.service
import testlib
class MyService(mitogen.service.Service):
def __init__(self, router):
super(MyService, self).__init__(router)
self._counter = 0
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def get_id(self):
self._counter += 1
return self._counter, id(self)
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def privileged_op(self):
return 'privileged!'
@mitogen.service.expose(policy=mitogen.service.AllowAny())
def unprivileged_op(self):
return 'unprivileged!'
class MyService2(MyService):
"""
A uniquely named service that lets us test framework activation and class
activation separately.
"""
def call_service_in(context, service_name, method_name):
return context.call_service(service_name, method_name)
class ActivationTest(testlib.RouterMixin, testlib.TestCase):
def test_parent_can_activate(self):
l1 = self.router.fork()
counter, id_ = l1.call_service(MyService, 'get_id')
self.assertEquals(1, counter)
self.assertTrue(isinstance(id_, int))
def test_sibling_cannot_activate_framework(self):
l1 = self.router.fork()
l2 = self.router.fork()
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0])
def test_sibling_cannot_activate_service(self):
l1 = self.router.fork()
l2 = self.router.fork()
l1.call_service(MyService, 'get_id') # force framework activation
exc = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id'))
msg = mitogen.service.Activator.not_active_msg % (MyService2.name(),)
self.assertTrue(msg in exc.args[0])
def test_activates_only_once(self):
l1 = self.router.fork()
counter, id_ = l1.call_service(MyService, 'get_id')
counter2, id_2 = l1.call_service(MyService, 'get_id')
self.assertEquals(1, counter)
self.assertEquals(2, counter2)
self.assertEquals(id_, id_2)
class PermissionTest(testlib.RouterMixin, testlib.TestCase):
def test_sibling_unprivileged_ok(self):
l1 = self.router.fork()
l1.call_service(MyService, 'get_id')
l2 = self.router.fork()
self.assertEquals('unprivileged!',
l2.call(call_service_in, l1, MyService.name(), 'unprivileged_op'))
def test_sibling_privileged_bad(self):
l1 = self.router.fork()
l1.call_service(MyService, 'get_id')
l2 = self.router.fork()
exc = self.assertRaises(mitogen.core.CallError, lambda:
l2.call(call_service_in, l1, MyService.name(), 'privileged_op'))
msg = mitogen.service.Invoker.unauthorized_msg % (
'privileged_op',
MyService.name(),
)
self.assertTrue(msg in exc.args[0])
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save