issue #186: add PushFileService

This is like FileService but blocks until the file is pushed by a parent
context, with deduplicating behaviour at each level in the hierarchy. It
does not stream large files, so it is only suitable for small files like
Python modules.

Additionally add SerializedInvoker for use with PushFileService, which
ensures all method calls to a single service occur in sequence.
pull/262/head
David Wilson 8 years ago
parent 2e8c027322
commit a3b747af1b

@ -17,8 +17,9 @@ Overview
Service Service
* User-supplied class with explicitly exposed methods. * User-supplied class with explicitly exposed methods.
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass).
* May be auto-imported/constructed in a child from a parent simply by calling it * May be auto-imported/constructed in a child from a parent simply by calling it
* Identified in calls by its canonical name (e.g. mypkg.mymod.MyClass) by
default, but may use any naming scheme the configured activator understands.
* Children receive refusals if the class is not already activated by a aprent * Children receive refusals if the class is not already activated by a aprent
* Has an associated Select instance which may be dynamically loaded with * Has an associated Select instance which may be dynamically loaded with
receivers over time, on_message_received() invoked if any receiver becomes receivers over time, on_message_received() invoked if any receiver becomes
@ -28,9 +29,12 @@ Invoker
* Abstracts mechanism for calling a service method and verifying permissions. * Abstracts mechanism for calling a service method and verifying permissions.
* Built-in 'service.Invoker': concurrent execution of all methods on the thread pool. * Built-in 'service.Invoker': concurrent execution of all methods on the thread pool.
* Built-in 'service.SerializedInvoker': serialization of all calls on a single
thread borrowed from the pool while any request is pending.
* Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct * Built-in 'service.DeduplicatingInvoker': requests are aggregated by distinct
(method, kwargs) key, only one such method executes, return value is cached (method, kwargs) key, only one such method ever executes, return value is
and broadcast to all requesters. cached and broadcast to all request waiters. Waiters do not block additional
pool threads.
Activator Activator

@ -195,12 +195,12 @@ class Activator(object):
def activate(self, pool, service_name, msg): def activate(self, pool, service_name, msg):
mod_name, _, class_name = service_name.rpartition('.') mod_name, _, class_name = service_name.rpartition('.')
if not self.is_permitted(mod_name, class_name, msg): if msg and not self.is_permitted(mod_name, class_name, msg):
raise mitogen.core.CallError(self.not_active_msg, service_name) raise mitogen.core.CallError(self.not_active_msg, service_name)
module = mitogen.core.import_module(mod_name) module = mitogen.core.import_module(mod_name)
klass = getattr(module, class_name) klass = getattr(module, class_name)
service = klass(pool.router) service = klass(router=pool.router)
pool.add(service) pool.add(service)
return service return service
@ -261,6 +261,50 @@ class Invoker(object):
msg.reply(response) msg.reply(response)
class SerializedInvoker(Invoker):
def __init__(self, **kwargs):
super(SerializedInvoker, self).__init__(**kwargs)
self._lock = threading.Lock()
self._queue = []
self._running = False
def _pop(self):
self._lock.acquire()
try:
try:
return self._queue.pop(0)
except IndexError:
self._running = False
finally:
self._lock.release()
def _run(self):
while True:
tup = self._pop()
if tup is None:
return
method_name, kwargs, msg = tup
try:
super(SerializedInvoker, self).invoke(method_name, kwargs, msg)
except Exception:
LOG.exception('%r: while invoking %r of %r',
self, method_name, self.service)
msg.reply(mitogen.core.Message.dead())
def invoke(self, method_name, kwargs, msg):
self._lock.acquire()
try:
self._queue.append((method_name, kwargs, msg))
first = not self._running
self._running = True
finally:
self._lock.release()
if first:
self._run()
return Service.NO_REPLY
class DeduplicatingInvoker(Invoker): class DeduplicatingInvoker(Invoker):
""" """
A service that deduplicates and caches expensive responses. Requests are A service that deduplicates and caches expensive responses. Requests are
@ -419,7 +463,7 @@ class Pool(object):
if name in self._invoker_by_name: if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,)) raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv assert service.select not in self._func_by_recv
invoker = service.invoker_class(service) invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message self._func_by_recv[service.select] = service.on_message
@ -439,13 +483,17 @@ class Pool(object):
invoker = self._invoker_by_name.get(name) invoker = self._invoker_by_name.get(name)
if not invoker: if not invoker:
service = self._activator.activate(self, name, msg) service = self._activator.activate(self, name, msg)
invoker = service.invoker_class(service) invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker self._invoker_by_name[name] = invoker
finally: finally:
self._lock.release() self._lock.release()
return invoker return invoker
def get_service(self, name):
invoker = self.get_invoker(name, None)
return invoker.service
def _validate(self, msg): def _validate(self, msg):
tup = msg.unpickle(throw=False) tup = msg.unpickle(throw=False)
if not (isinstance(tup, tuple) and if not (isinstance(tup, tuple) and
@ -466,7 +514,8 @@ class Pool(object):
LOG.warning('%r: call error: %s: %s', self, msg, e) LOG.warning('%r: call error: %s: %s', self, msg, e)
msg.reply(e) msg.reply(e)
except Exception: except Exception:
LOG.exception('While invoking %r._invoke()', self) LOG.exception('%r: while invoking %r of %r',
self, method_name, service_name)
e = sys.exc_info()[1] e = sys.exc_info()[1]
msg.reply(mitogen.core.CallError(e)) msg.reply(mitogen.core.CallError(e))
@ -513,6 +562,111 @@ class FileStreamState(object):
self.lock = threading.Lock() self.lock = threading.Lock()
class PushFileService(Service):
"""
Push-based file service. Files are delivered and cached in RAM, sent
recursively from parent to child. A child that requests a file via
:meth:`get` will block until it has ben delivered by a parent.
This service will eventually be merged into FileService.
"""
invoker_class = SerializedInvoker
def __init__(self, **kwargs):
super(PushFileService, self).__init__(**kwargs)
self._lock = threading.Lock()
self._cache = {}
self._waiters = {}
self._sent_by_stream = {}
def get(self, path):
self._lock.acquire()
try:
if path in self._cache:
return self._cache[path]
waiters = self._waiters.setdefault(path, [])
latch = mitogen.core.Latch()
waiters.append(lambda: latch.put(None))
finally:
self._lock.release()
latch.get()
LOG.debug('%r.get(%r) -> %r', self, path, self._cache[path])
return self._cache[path]
def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent and child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
else:
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
path=path,
data=self._cache[path],
context=context
).close()
@expose(policy=AllowParents())
@arg_spec({
'context': mitogen.core.Context,
'path': basestring,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
if path not in self._cache:
fp = open(path, 'rb')
try:
self._cache[path] = mitogen.core.Blob(fp.read())
finally:
fp.close()
self._forward(context, path)
def _store(self, path, data):
self._lock.acquire()
try:
self._cache[path] = data
return self._waiters.pop(path, [])
finally:
self._lock.release()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'data': mitogen.core.Blob,
'context': mitogen.core.Context,
})
def store_and_forward(self, path, data, context):
LOG.debug('%r.store_and_forward(%r, %r, %r)',
self, path, data, context)
waiters = self._store(path, data)
if context.context_id != mitogen.context_id:
self._forward(path, context)
for callback in waiters:
callback()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
'path': basestring,
'context': mitogen.core.Context,
})
def forward(self, path, context):
LOG.debug('%r.forward(%r, %r)', self, path, context)
if path not in self._cache:
LOG.error('%r: %r is not in local cache', self, path)
return
self._forward(path, context)
class FileService(Service): class FileService(Service):
""" """
Streaming file server, used to serve small files and huge files alike. Streaming file server, used to serve small files and huge files alike.

Loading…
Cancel
Save