diff --git a/mitogen/service.py b/mitogen/service.py index 0fba59c9..3254e69a 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -603,8 +603,6 @@ class PushFileService(Service): This service will eventually be merged into FileService. """ - invoker_class = SerializedInvoker - def __init__(self, **kwargs): super(PushFileService, self).__init__(**kwargs) self._lock = threading.Lock() @@ -613,13 +611,16 @@ class PushFileService(Service): self._sent_by_stream = {} def get(self, path): + """ + Fetch a file from the cache. + """ assert isinstance(path, mitogen.core.UnicodeType) self._lock.acquire() try: if path in self._cache: return self._cache[path] - waiters = self._waiters.setdefault(path, []) latch = mitogen.core.Latch() + waiters = self._waiters.setdefault(path, []) waiters.append(lambda: latch.put(None)) finally: self._lock.release() @@ -633,14 +634,15 @@ class PushFileService(Service): stream = self.router.stream_by_id(context.context_id) child = mitogen.core.Context(self.router, stream.remote_id) sent = self._sent_by_stream.setdefault(stream, set()) - if path in sent and child.context_id != context.context_id: - child.call_service_async( - service_name=self.name(), - method_name='forward', - path=path, - context=context - ).close() - elif path not in sent: + if path in sent: + if child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: child.call_service_async( service_name=self.name(), method_name='store_and_forward', @@ -680,14 +682,6 @@ class PushFileService(Service): fp.close() self._forward(context, path) - def _store(self, path, data): - self._lock.acquire() - try: - self._cache[path] = data - return self._waiters.pop(path, []) - finally: - self._lock.release() - @expose(policy=AllowParents()) @no_reply() @arg_spec({ @@ -696,9 +690,16 @@ class PushFileService(Service): 'context': mitogen.core.Context, }) def store_and_forward(self, path, data, context): - LOG.debug('%r.store_and_forward(%r, %r, %r)', - self, path, data, context) - waiters = self._store(path, data) + LOG.debug('%r.store_and_forward(%r, %r, %r) %r', + self, path, data, context, + threading.currentThread().getName()) + self._lock.acquire() + try: + self._cache[path] = data + waiters = self._waiters.pop(path, []) + finally: + self._lock.release() + if context.context_id != mitogen.context_id: self._forward(context, path) for callback in waiters: @@ -712,10 +713,17 @@ class PushFileService(Service): }) def forward(self, path, context): LOG.debug('%r.forward(%r, %r)', self, path, context) - if path not in self._cache: - LOG.error('%r: %r is not in local cache', self, path) - return - self._forward(context, path) + func = lambda: self._forward(context, path) + + self._lock.acquire() + try: + if path in self._cache: + func() + else: + LOG.debug('%r: %r not cached yet, queueing', self, path) + self._waiters.setdefault(path, []).append(func) + finally: + self._lock.release() class FileService(Service): diff --git a/tests/push_file_service_test.py b/tests/push_file_service_test.py new file mode 100644 index 00000000..1dfff241 --- /dev/null +++ b/tests/push_file_service_test.py @@ -0,0 +1,56 @@ + +import os +import tempfile +import unittest2 + +import mitogen.core +import mitogen.service +import testlib +from mitogen.core import b + + +def prepare(): + # ensure module loading delay is complete before loading PushFileService. + pass + + +@mitogen.core.takes_router +def wait_for_file(path, router): + pool = mitogen.service.get_or_create_pool(router=router) + service = pool.get_service(u'mitogen.service.PushFileService') + return service.get(path) + + +class PropagateToTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.service.PushFileService + + def test_two_grandchild_one_intermediary(self): + tf = tempfile.NamedTemporaryFile() + path = mitogen.core.to_text(tf.name) + + try: + tf.write(b('test')) + tf.flush() + + interm = self.router.local(name='interm') + c1 = self.router.local(via=interm, name='c1') + c2 = self.router.local(via=interm) + + c1.call(prepare) + c2.call(prepare) + + service = self.klass(router=self.router) + service.propagate_to(context=c1, path=path) + service.propagate_to(context=c2, path=path) + + s = c1.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + + s = c2.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + finally: + tf.close() + + +if __name__ == '__main__': + unittest2.main()