diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 2b031a9f..35643d7d 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -458,13 +458,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): ) ) - dct = mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='get', - kwargs=mitogen.utils.cast({ - 'stack': stack, - }) + dct = self.parent.call_service( + service_name='ansible_mitogen.services.ContextService', + method_name='get', + stack=mitogen.utils.cast(list(stack)), ) if dct['msg']: @@ -490,13 +487,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): multiple times. """ if self.context: - mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='put', - kwargs={ - 'context': self.context - } + self.parent.call_service( + service_name='ansible_mitogen.services.ContextService', + method_name='put', + context=self.context ) self.context = None @@ -618,13 +612,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): return self.put_data(out_path, s, mode=st.st_mode, utimes=(st.st_atime, st.st_mtime)) - mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.FileService.handle, - method='register', - kwargs={ - 'path': mitogen.utils.cast(in_path) - } + self.parent.call_service( + service_name='ansible_mitogen.services.FileService', + method_name='register', + path=mitogen.utils.cast(in_path) ) self.call( ansible_mitogen.target.transfer_file, diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index e1f37fcd..6605686c 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -182,13 +182,10 @@ class BinaryPlanner(Planner): def _grant_file_service_access(self, invocation): invocation.connection._connect() - mitogen.service.call( - context=invocation.connection.parent, - handle=ansible_mitogen.services.FileService.handle, - method='register', - kwargs={ - 'path': invocation.module_path - } + invocation.connection.parent.call_service( + service_name='ansible_mitogen.services.FileService', + method_name='register', + path=invocation.module_path, ) def plan(self, invocation, **kwargs): @@ -301,16 +298,14 @@ class NewStylePlanner(ScriptPlanner): def get_module_utils(self, invocation): invocation.connection._connect() - return mitogen.service.call( - context=invocation.connection.parent, - handle=ansible_mitogen.services.ModuleDepService.handle, - method='scan', - kwargs={ - 'module_name': 'ansible_module_%s' % (invocation.module_name,), - 'module_path': invocation.module_path, - 'search_path': self.get_search_path(invocation), - 'builtin_path': module_common._MODULE_UTILS_PATH, - } + return invocation.connection.parent.call_service( + service_name='ansible_mitogen.services.ModuleDepService', + method_name='scan', + + module_name='ansible_module_%s' % (invocation.module_name,), + module_path=invocation.module_path, + search_path=self.get_search_path(invocation), + builtin_path=module_common._MODULE_UTILS_PATH, ) def plan(self, invocation): diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 0f5be35d..24e1f5b1 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -76,8 +76,6 @@ class ContextService(mitogen.service.Service): processes and arranging for the worker to select one according to a hash of the connection parameters (sharding). """ - handle = 500 - max_message_size = 1000 max_interpreters = int(os.getenv('MITOGEN_MAX_INTERPRETERS', '20')) def __init__(self, *args, **kwargs): @@ -420,8 +418,6 @@ class FileService(mitogen.service.Service): proceed normally, without the associated thread needing to be forcefully killed. """ - handle = 501 - max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' context_mismatch_msg = 'sender= kwarg context must match requestee context' @@ -609,9 +605,6 @@ class ModuleDepService(mitogen.service.Service): Scan a new-style module and produce a cached mapping of module_utils names to their resolved filesystem paths. """ - max_message_size = 1000 - handle = 502 - def __init__(self, file_service, **kwargs): super(ModuleDepService, self).__init__(**kwargs) self._file_service = file_service diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 05deacf0..090730d8 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -90,26 +90,20 @@ def _get_file(context, path, out_fp): LOG.debug('_get_file(): fetching %r from %r', path, context) t0 = time.time() recv = mitogen.core.Receiver(router=context.router) - metadata = mitogen.service.call( - context=context, - handle=ansible_mitogen.services.FileService.handle, - method='fetch', - kwargs={ - 'path': path, - 'sender': recv.to_sender() - } + metadata = context.call_service( + service_name='ansible_mitogen.services.FileService', + method_name='fetch', + path=path, + sender=recv.to_sender(), ) for chunk in recv: s = chunk.unpickle() LOG.debug('_get_file(%r): received %d bytes', path, len(s)) - mitogen.service.call_async( - context=context, - handle=ansible_mitogen.services.FileService.handle, - method='acknowledge', - kwargs={ - 'size': len(s), - } + context.call_service_async( + service_name='ansible_mitogen.services.FileService', + method_name='acknowledge', + size=len(s), ).close() out_fp.write(s) diff --git a/docs/services.rst b/docs/services.rst index 0c12cee9..4c3f0ab1 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -6,14 +6,48 @@ Service Framework ================= Mitogen includes a simple framework for implementing services exposed to other -contexts, with built-in subclasses that capture some common service models. -This is a work in progress, and new functionality will be added as common usage -patterns emerge. +contexts, with some built-in subclasses to capture common designs. This is a +work in progress, and new functionality will be added as common usage patterns +emerge. Overview -------- +Service + +* User-supplied class with explicitly exposed methods. +* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass). +* May be auto-imported/constructed in a child from a parent simply by calling it +* Children receive refusals if the class is not already activated by a aprent +* Has an associated Select instance which may be dynamically loaded with + receivers over time, on_message_received() invoked if any receiver becomes + ready. + +Invoker + +* Abstracts mechanism for calling a service method and verifying permissions. +* Built-in 'service.Invoker': concurrent execution of all methods on the thread pool. +* Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct + (method, kwargs) key, only one such method executes, return value is cached + and broadcast to all requesters. + +Activator + +* Abstracts mechanism for activating a service and verifying activation + permission. +* Built-in activator looks for service by fully.qualified.ClassName using + Python import mechanism, and only permits parents to trigger activation. + +Pool + +* Manages a fixed-size thread pool, mapping of service name to Invoker, and an + aggregate Select over every activate service's Selects. +* Constructed automatically in children in response to the first + CALL_SERVICE message sent to them by a parent. +* Must be constructed manually in parent context. +* Has close() and add() methods. + Example ------- diff --git a/examples/service/server.py b/examples/service/server.py index 659e677c..2f488d20 100644 --- a/examples/service/server.py +++ b/examples/service/server.py @@ -11,9 +11,6 @@ import mitogen.unix class PingService(mitogen.service.Service): - well_known_id = 500 - max_message_size = 1000 - def dispatch(self, dct, msg): return 'Hello, world' diff --git a/mitogen/core.py b/mitogen/core.py index 7f2339a4..ccaf9ab0 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,6 +75,10 @@ IOLOG.setLevel(logging.INFO) _v = False _vv = False +# Also taken by Broker, no blocking work can occur with it held. +_service_call_lock = threading.Lock() +_service_calls = [] + GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 @@ -85,6 +89,7 @@ SHUTDOWN = 106 LOAD_MODULE = 107 FORWARD_MODULE = 108 DETACHING = 109 +CALL_SERVICE = 110 IS_DEAD = 999 try: @@ -1000,12 +1005,6 @@ class Context(object): _v and LOG.debug('%r.on_disconnect()', self) fire(self, 'disconnect') - def send(self, msg): - """send `obj` to `handle`, and tell the broker we have output. May - be called from any thread.""" - msg.dst_id = self.context_id - self.router.route(msg) - def send_async(self, msg, persist=False): if self.router.broker._thread == threading.currentThread(): # TODO raise SystemError('Cannot making blocking call on broker thread') @@ -1018,6 +1017,25 @@ class Context(object): self.send(msg) return receiver + def call_service_async(self, service_name, method_name, **kwargs): + _v and LOG.debug('%r.call_service_async(%r, %r, %r)', + self, service_name, method_name, kwargs) + if not isinstance(service_name, basestring): + service_name = service_name.name() # Service.name() + tup = (service_name, method_name, kwargs) + msg = Message.pickled(tup, handle=CALL_SERVICE) + return self.send_async(msg) + + def send(self, msg): + """send `obj` to `handle`, and tell the broker we have output. May + be called from any thread.""" + msg.dst_id = self.context_id + self.router.route(msg) + + def call_service(self, service_name, method_name, **kwargs): + recv = self.call_service_async(service_name, method_name, **kwargs) + return recv.get().unpickle() + def send_await(self, msg, deadline=None): """Send `msg` and wait for a response with an optional timeout.""" receiver = self.send_async(msg) @@ -1626,6 +1644,28 @@ class ExternalContext(object): if not self.config['profiling']: os.kill(os.getpid(), signal.SIGTERM) + def _on_call_service_msg(self, msg): + """ + Stub CALL_SERVICE handler, push message on temporary queue and invoke + _on_stub_call() from the main thread. + """ + if msg.is_dead: + return + _service_call_lock.acquire() + try: + _service_calls.append(msg) + finally: + _service_call_lock.release() + + self.router.route( + Message.pickled( + dst_id=mitogen.context_id, + handle=CALL_FUNCTION, + obj=('mitogen.service', None, '_on_stub_call', (), {}), + router=self.router, + ) + ) + def _on_shutdown_msg(self, msg): _v and LOG.debug('_on_shutdown_msg(%r)', msg) if not msg.is_dead: @@ -1673,6 +1713,11 @@ class ExternalContext(object): handle=SHUTDOWN, policy=has_parent_authority, ) + self.router.add_handler( + fn=self._on_call_service_msg, + handle=CALL_SERVICE, + policy=has_parent_authority, + ) self.master = Context(self.router, 0, 'master') parent_id = self.config['parent_ids'][0] if parent_id == 0: diff --git a/mitogen/service.py b/mitogen/service.py index 01a6454c..6719f833 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -35,23 +35,36 @@ import mitogen.select from mitogen.core import LOG -class Policy(object): - """ - Base security policy. - """ - def is_authorized(self, service, msg): - raise NotImplementedError() +DEFAULT_POOL_SIZE = 16 +_pool = None -class AllowAny(Policy): - def is_authorized(self, service, msg): - return True +@mitogen.core.takes_router +def get_or_create_pool(size=None, router=None): + global _pool + if _pool is None: + _pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE) + return _pool -class AllowParents(Policy): - def is_authorized(self, service, msg): - return (msg.auth_id in mitogen.parent_ids or - msg.auth_id == mitogen.context_id) +@mitogen.core.takes_router +def _on_stub_call(router): + """ + Called for each message received by the core.py stub CALL_SERVICE handler. + Create the pool if it doesn't already exist, and push enqueued messages + into the pool's receiver. This may be called more than once as the stub + service handler runs in asynchronous context, while _on_stub_call() happens + on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool + has a chance to install the real CALL_SERVICE handler. + """ + pool = get_or_create_pool(router=router) + mitogen.core._service_call_lock.acquire() + try: + for msg in mitogen.core._service_calls: + pool._receiver._on_receive(msg) + del mitogen.core._service_calls[:] + finally: + mitogen.core._service_call_lock.release() def validate_arg_spec(spec, args): @@ -132,63 +145,86 @@ def no_reply(): return wrapper -class Service(object): - #: Sentinel object to suppress reply generation, since returning ``None`` - #: will trigger a response message containing the pickled ``None``. - NO_REPLY = object() +class Error(Exception): + """ + Raised when an error occurs configuring a service or pool. + """ - #: If ``None``, a handle is dynamically allocated, otherwise the fixed - #: integer handle to use. - handle = None - max_message_size = 0 - def __init__(self, router): - self.router = router - self.recv = mitogen.core.Receiver(router, self.handle) - self.recv.service = self - self.handle = self.recv.handle - self.running = True +class Policy(object): + """ + Base security policy. + """ + def is_authorized(self, service, msg): + raise NotImplementedError() - def __repr__(self): - return '%s()' % (self.__class__.__name__,) - def on_shutdown(self): - """ - Called by Pool.shutdown() once the last worker thread has exitted. - """ +class AllowAny(Policy): + def is_authorized(self, service, msg): + return True - def dispatch(self, args, msg): - raise NotImplementedError() - def _validate_message(self, msg): - if len(msg.data) > self.max_message_size: - raise mitogen.core.CallError('Message size exceeded.') +class AllowParents(Policy): + def is_authorized(self, service, msg): + return (msg.auth_id in mitogen.parent_ids or + msg.auth_id == mitogen.context_id) - pair = msg.unpickle(throw=False) - if not (isinstance(pair, tuple) and - len(pair) == 2 and - isinstance(pair[0], basestring)): - raise mitogen.core.CallError('Invalid message format.') - method_name, kwargs = pair - method = getattr(self, method_name, None) +class Activator(object): + """ + """ + def is_permitted(self, mod_name, class_name, msg): + return mitogen.core.has_parent_authority(msg) + + not_active_msg = ( + 'Service %r is not yet activated in this context, and the ' + 'caller is not privileged, therefore autoactivation is disabled.' + ) + + def activate(self, pool, service_name, msg): + mod_name, _, class_name = service_name.rpartition('.') + if not self.is_permitted(mod_name, class_name, msg): + raise mitogen.core.CallError(self.not_active_msg, service_name) + + module = mitogen.core.import_module(mod_name) + klass = getattr(module, class_name) + service = klass(pool.router) + pool.add(service) + return service + + +class Invoker(object): + def __init__(self, service): + self.service = service + + def __repr__(self): + return '%s(%s)' % (type(self).__name__, self.service) + + unauthorized_msg = ( + 'Caller is not authorized to invoke %r of service %r' + ) + + def _validate(self, method_name, kwargs, msg): + method = getattr(self.service, method_name, None) if method is None: - raise mitogen.core.CallError('No such method exists.') + raise mitogen.core.CallError('No such method: %r', method_name) policies = getattr(method, 'mitogen_service__policies', None) if not policies: raise mitogen.core.CallError('Method has no policies set.') - if not all(p.is_authorized(self, msg) for p in policies): - raise mitogen.core.CallError('Unauthorized') + if not all(p.is_authorized(self.service, msg) for p in policies): + raise mitogen.core.CallError( + self.unauthorized_msg, + method_name, + self.service.name() + ) required = getattr(method, 'mitogen_service__arg_spec', {}) validate_arg_spec(required, kwargs) - return method_name, kwargs - def _on_receive_message(self, msg): - method_name, kwargs = self._validate_message(msg) - method = getattr(self, method_name) + def _invoke(self, method_name, kwargs, msg): + method = getattr(self.service, method_name) if 'msg' in method.func_code.co_varnames: kwargs['msg'] = msg # TODO: hack @@ -197,7 +233,7 @@ class Service(object): try: ret = method(**kwargs) if no_reply: - return self.NO_REPLY + return Service.NO_REPLY return ret except Exception: if no_reply: @@ -206,22 +242,14 @@ class Service(object): else: raise - def on_receive_message(self, msg): - try: - response = self._on_receive_message(msg) - if response is not self.NO_REPLY: - msg.reply(response) - except mitogen.core.CallError: - e = sys.exc_info()[1] - LOG.warning('%r: call error: %s: %s', self, msg, e) - msg.reply(e) - except Exception: - LOG.exception('While invoking %r.dispatch()', self) - e = sys.exc_info()[1] - msg.reply(mitogen.core.CallError(e)) + def invoke(self, method_name, kwargs, msg): + self._validate(method_name, kwargs, msg) + response = self._invoke(method_name, kwargs, msg) + if response is not Service.NO_REPLY: + msg.reply(response) -class DeduplicatingService(Service): +class DeduplicatingInvoker(Invoker): """ A service that deduplicates and caches expensive responses. Requests are deduplicated according to a customizable key, and the single expensive @@ -233,8 +261,8 @@ class DeduplicatingService(Service): Only one pool thread is blocked during generation of the response, regardless of the number of requestors. """ - def __init__(self, router): - super(DeduplicatingService, self).__init__(router) + def __init__(self, service): + super(DeduplicatingInvoker, self).__init__(service) self._responses = {} self._waiters = {} self._lock = threading.Lock() @@ -261,10 +289,8 @@ class DeduplicatingService(Service): finally: self._lock.release() - def _on_receive_message(self, msg): - method_name, kwargs = self._validate_message(msg) + def _invoke(self, method_name, kwargs, msg): key = self.key_from_request(method_name, kwargs) - self._lock.acquire() try: if key in self._responses: @@ -272,7 +298,7 @@ class DeduplicatingService(Service): if key in self._waiters: self._waiters[key].append(msg) - return self.NO_REPLY + return Service.NO_REPLY self._waiters[key] = [msg] finally: @@ -289,7 +315,37 @@ class DeduplicatingService(Service): e = sys.exc_info()[1] self._produce_response(key, mitogen.core.CallError(e)) - return self.NO_REPLY + return Service.NO_REPLY + + +class Service(object): + #: Sentinel object to suppress reply generation, since returning ``None`` + #: will trigger a response message containing the pickled ``None``. + NO_REPLY = object() + + invoker_class = Invoker + + @classmethod + def name(cls): + return '%s.%s' % (cls.__module__, cls.__name__) + + def __init__(self, router): + self.router = router + self.select = mitogen.select.Select() + + def __repr__(self): + return '%s()' % (self.__class__.__name__,) + + def on_message(self, recv, msg): + """ + Called when a message arrives on any of :attr:`select`'s registered + receivers. + """ + + def on_shutdown(self): + """ + Called by Pool.shutdown() once the last worker thread has exitted. + """ class Pool(object): @@ -299,7 +355,7 @@ class Pool(object): Internally this is implemented by subscribing every :py:class:`Service`'s :py:class:`mitogen.core.Receiver` using a single - :py:class:`mitogen.master.Select`, then arranging for every thread to + :py:class:`mitogen.select.Select`, then arranging for every thread to consume messages delivered to that select. In this way the threads are fairly shared by all available services, and no @@ -308,21 +364,33 @@ class Pool(object): There is no penalty for exposing large numbers of services; the list of exposed services could even be generated dynamically in response to your program's configuration or its input data. + + :param mitogen.core.Router router: + Router to listen for ``CALL_SERVICE`` messages on. + :param list services: + Initial list of services to register. """ + activator_class = Activator + def __init__(self, router, services, size=1): - assert size > 0 self.router = router - self.services = list(services) - self.size = size - self._select = mitogen.select.Select( - receivers=[ - service.recv - for service in self.services - ], - oneshot=False, + self._activator = self.activator_class() + self._receiver = mitogen.core.Receiver( + router=router, + handle=mitogen.core.CALL_SERVICE, ) + + self._select = mitogen.select.Select(oneshot=False) + self._select.add(self._receiver) + #: Serialize service construction. + self._lock = threading.Lock() + self._func_by_recv = {self._receiver: self._on_service_call} + self._invoker_by_name = {} + + for service in services: + self.add(service) self._threads = [] - for x in xrange(size): + for x in range(size): thread = threading.Thread( name='mitogen.service.Pool.%x.worker-%d' % (id(self), x,), target=self._worker_main, @@ -330,6 +398,19 @@ class Pool(object): thread.start() self._threads.append(thread) + @property + def size(self): + return len(self._threads) + + def add(self, service): + name = service.name() + if name in self._invoker_by_name: + raise Error('service named %r already registered' % (name,)) + assert service.select not in self._func_by_recv + invoker = service.invoker_class(service) + self._invoker_by_name[name] = invoker + self._func_by_recv[service.select] = service.on_message + closed = False def stop(self): @@ -337,8 +418,45 @@ class Pool(object): self._select.close() for th in self._threads: th.join() - for service in self.services: - service.on_shutdown() + for invoker in self._invoker_by_name.itervalues(): + invoker.service.on_shutdown() + + def get_invoker(self, name, msg): + self._lock.acquire() + try: + invoker = self._invoker_by_name.get(name) + if not invoker: + service = self._activator.activate(self, name, msg) + invoker = service.invoker_class(service) + self._invoker_by_name[name] = invoker + finally: + self._lock.release() + + return invoker + + def _validate(self, msg): + tup = msg.unpickle(throw=False) + if not (isinstance(tup, tuple) and + len(tup) == 3 and + isinstance(tup[0], basestring) and + isinstance(tup[1], basestring) and + isinstance(tup[2], dict)): + raise mitogen.core.CallError('Invalid message format.') + + def _on_service_call(self, recv, msg): + self._validate(msg) + service_name, method_name, kwargs = msg.unpickle() + try: + invoker = self.get_invoker(service_name, msg) + return invoker.invoke(method_name, kwargs, msg) + except mitogen.core.CallError: + e = sys.exc_info()[1] + LOG.warning('%r: call error: %s: %s', self, msg, e) + msg.reply(e) + except Exception: + LOG.exception('While invoking %r._invoke()', self) + e = sys.exc_info()[1] + msg.reply(mitogen.core.CallError(e)) def _worker_run(self): while not self.closed: @@ -349,11 +467,11 @@ class Pool(object): LOG.info('%r: channel or latch closed, exitting: %s', self, e) return - service = msg.receiver.service + func = self._func_by_recv[msg.receiver] try: - service.on_receive_message(msg) + func(msg.receiver, msg) except Exception: - LOG.exception('While handling %r using %r', msg, service) + LOG.exception('While handling %r using %r', msg, func) def _worker_main(self): try: @@ -367,19 +485,6 @@ class Pool(object): th = threading.currentThread() return 'mitogen.service.Pool(%#x, size=%d, th=%r)' % ( id(self), - self.size, + len(self._threads), th.name, ) - - -def call_async(context, handle, method, kwargs=None): - LOG.debug('service.call_async(%r, %r, %r, %r)', - context, handle, method, kwargs) - pair = (method, kwargs or {}) - msg = mitogen.core.Message.pickled(pair, handle=handle) - return context.send_async(msg) - - -def call(context, handle, method, kwargs): - recv = call_async(context, handle, method, kwargs) - return recv.get().unpickle() diff --git a/tests/ansible/lib/action/mitogen_shutdown_all.py b/tests/ansible/lib/action/mitogen_shutdown_all.py index 9af21e11..6ebdbf5c 100644 --- a/tests/ansible/lib/action/mitogen_shutdown_all.py +++ b/tests/ansible/lib/action/mitogen_shutdown_all.py @@ -25,10 +25,8 @@ class ActionModule(ActionBase): self._connection._connect() return { 'changed': True, - 'result': mitogen.service.call( - context=self._connection.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='shutdown_all', - kwargs={} + 'result': self._connection.parent.call_service( + service_name='ansible_mitogen.services.ContextService', + method_name='shutdown_all', ) } diff --git a/tests/service_test.py b/tests/service_test.py new file mode 100644 index 00000000..8cb18258 --- /dev/null +++ b/tests/service_test.py @@ -0,0 +1,93 @@ +import unittest2 + +import mitogen.core +import mitogen.service +import testlib + + +class MyService(mitogen.service.Service): + def __init__(self, router): + super(MyService, self).__init__(router) + self._counter = 0 + + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + def get_id(self): + self._counter += 1 + return self._counter, id(self) + + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + def privileged_op(self): + return 'privileged!' + + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + def unprivileged_op(self): + return 'unprivileged!' + + + +class MyService2(MyService): + """ + A uniquely named service that lets us test framework activation and class + activation separately. + """ + + +def call_service_in(context, service_name, method_name): + return context.call_service(service_name, method_name) + + +class ActivationTest(testlib.RouterMixin, testlib.TestCase): + def test_parent_can_activate(self): + l1 = self.router.fork() + counter, id_ = l1.call_service(MyService, 'get_id') + self.assertEquals(1, counter) + self.assertTrue(isinstance(id_, int)) + + def test_sibling_cannot_activate_framework(self): + l1 = self.router.fork() + l2 = self.router.fork() + exc = self.assertRaises(mitogen.core.CallError, + lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id')) + self.assertTrue(mitogen.core.Router.refused_msg in exc.args[0]) + + def test_sibling_cannot_activate_service(self): + l1 = self.router.fork() + l2 = self.router.fork() + l1.call_service(MyService, 'get_id') # force framework activation + exc = self.assertRaises(mitogen.core.CallError, + lambda: l2.call(call_service_in, l1, MyService2.name(), 'get_id')) + msg = mitogen.service.Activator.not_active_msg % (MyService2.name(),) + self.assertTrue(msg in exc.args[0]) + + def test_activates_only_once(self): + l1 = self.router.fork() + counter, id_ = l1.call_service(MyService, 'get_id') + counter2, id_2 = l1.call_service(MyService, 'get_id') + self.assertEquals(1, counter) + self.assertEquals(2, counter2) + self.assertEquals(id_, id_2) + + +class PermissionTest(testlib.RouterMixin, testlib.TestCase): + def test_sibling_unprivileged_ok(self): + l1 = self.router.fork() + l1.call_service(MyService, 'get_id') + l2 = self.router.fork() + self.assertEquals('unprivileged!', + l2.call(call_service_in, l1, MyService.name(), 'unprivileged_op')) + + def test_sibling_privileged_bad(self): + l1 = self.router.fork() + l1.call_service(MyService, 'get_id') + l2 = self.router.fork() + exc = self.assertRaises(mitogen.core.CallError, lambda: + l2.call(call_service_in, l1, MyService.name(), 'privileged_op')) + msg = mitogen.service.Invoker.unauthorized_msg % ( + 'privileged_op', + MyService.name(), + ) + self.assertTrue(msg in exc.args[0]) + + +if __name__ == '__main__': + unittest2.main()