From d4c0250083be59176ee06c060bf395702200109c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 13 Feb 2019 21:25:45 +0000 Subject: [PATCH] issue #532: PushFileService race. There has always been a race in PushFileService since given a parent asked to forward modules to two children via some intermediary: interm = router.local() c1 = router.local(via=interm) c2 = router.local(via=interm) service.propagate_to(c1, 'foo/bar.py') service.propagate_to(c2, 'foo/bar.py') Two calls will be emitted to 'interm': PushFileService.store_and_forward(c1, 'foo/bar.py', [blob]) PushFileService.store(c2, 'foo/bar.py') Which will be processed in-order up to the point where service pool threads in 'interm' are woken to process the message. While it is guaranteed store_and_forward() will be processed first, no guarantee existed that its assigned pool thread would wake and take _lock first, thus it was possible for forward() to win the race, and for a request to arrive to forward a file that had not been placed in local cache yet. Here we get rid of SerializedInvoker entirely, as it is partially to blame for hiding the race: SerializedInvoker can only ensure no two messages are processed simultaneously, it cannot ensure the messages are processed in their intended order. Instead, teach forward() that it may be called before store_and_forward(), and if that is the case, to place the forward request on to _waiters alongside any local threads blocked in get(). --- mitogen/service.py | 60 +++++++++++++++++++-------------- tests/push_file_service_test.py | 56 ++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 tests/push_file_service_test.py diff --git a/mitogen/service.py b/mitogen/service.py index 0fba59c9..3254e69a 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -603,8 +603,6 @@ class PushFileService(Service): This service will eventually be merged into FileService. """ - invoker_class = SerializedInvoker - def __init__(self, **kwargs): super(PushFileService, self).__init__(**kwargs) self._lock = threading.Lock() @@ -613,13 +611,16 @@ class PushFileService(Service): self._sent_by_stream = {} def get(self, path): + """ + Fetch a file from the cache. + """ assert isinstance(path, mitogen.core.UnicodeType) self._lock.acquire() try: if path in self._cache: return self._cache[path] - waiters = self._waiters.setdefault(path, []) latch = mitogen.core.Latch() + waiters = self._waiters.setdefault(path, []) waiters.append(lambda: latch.put(None)) finally: self._lock.release() @@ -633,14 +634,15 @@ class PushFileService(Service): stream = self.router.stream_by_id(context.context_id) child = mitogen.core.Context(self.router, stream.remote_id) sent = self._sent_by_stream.setdefault(stream, set()) - if path in sent and child.context_id != context.context_id: - child.call_service_async( - service_name=self.name(), - method_name='forward', - path=path, - context=context - ).close() - elif path not in sent: + if path in sent: + if child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: child.call_service_async( service_name=self.name(), method_name='store_and_forward', @@ -680,14 +682,6 @@ class PushFileService(Service): fp.close() self._forward(context, path) - def _store(self, path, data): - self._lock.acquire() - try: - self._cache[path] = data - return self._waiters.pop(path, []) - finally: - self._lock.release() - @expose(policy=AllowParents()) @no_reply() @arg_spec({ @@ -696,9 +690,16 @@ class PushFileService(Service): 'context': mitogen.core.Context, }) def store_and_forward(self, path, data, context): - LOG.debug('%r.store_and_forward(%r, %r, %r)', - self, path, data, context) - waiters = self._store(path, data) + LOG.debug('%r.store_and_forward(%r, %r, %r) %r', + self, path, data, context, + threading.currentThread().getName()) + self._lock.acquire() + try: + self._cache[path] = data + waiters = self._waiters.pop(path, []) + finally: + self._lock.release() + if context.context_id != mitogen.context_id: self._forward(context, path) for callback in waiters: @@ -712,10 +713,17 @@ class PushFileService(Service): }) def forward(self, path, context): LOG.debug('%r.forward(%r, %r)', self, path, context) - if path not in self._cache: - LOG.error('%r: %r is not in local cache', self, path) - return - self._forward(context, path) + func = lambda: self._forward(context, path) + + self._lock.acquire() + try: + if path in self._cache: + func() + else: + LOG.debug('%r: %r not cached yet, queueing', self, path) + self._waiters.setdefault(path, []).append(func) + finally: + self._lock.release() class FileService(Service): diff --git a/tests/push_file_service_test.py b/tests/push_file_service_test.py new file mode 100644 index 00000000..1dfff241 --- /dev/null +++ b/tests/push_file_service_test.py @@ -0,0 +1,56 @@ + +import os +import tempfile +import unittest2 + +import mitogen.core +import mitogen.service +import testlib +from mitogen.core import b + + +def prepare(): + # ensure module loading delay is complete before loading PushFileService. + pass + + +@mitogen.core.takes_router +def wait_for_file(path, router): + pool = mitogen.service.get_or_create_pool(router=router) + service = pool.get_service(u'mitogen.service.PushFileService') + return service.get(path) + + +class PropagateToTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.service.PushFileService + + def test_two_grandchild_one_intermediary(self): + tf = tempfile.NamedTemporaryFile() + path = mitogen.core.to_text(tf.name) + + try: + tf.write(b('test')) + tf.flush() + + interm = self.router.local(name='interm') + c1 = self.router.local(via=interm, name='c1') + c2 = self.router.local(via=interm) + + c1.call(prepare) + c2.call(prepare) + + service = self.klass(router=self.router) + service.propagate_to(context=c1, path=path) + service.propagate_to(context=c2, path=path) + + s = c1.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + + s = c2.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + finally: + tf.close() + + +if __name__ == '__main__': + unittest2.main()