issue #532: PushFileService race.

There has always been a race in PushFileService since given a parent
asked to forward modules to two children via some intermediary:

    interm = router.local()
    c1 = router.local(via=interm)
    c2 = router.local(via=interm)

    service.propagate_to(c1, 'foo/bar.py')
    service.propagate_to(c2, 'foo/bar.py')

Two calls will be emitted to 'interm':

    PushFileService.store_and_forward(c1, 'foo/bar.py', [blob])
    PushFileService.store(c2, 'foo/bar.py')

Which will be processed in-order up to the point where service pool
threads in 'interm' are woken to process the message.

While it is guaranteed store_and_forward() will be processed first, no
guarantee existed that its assigned pool thread would wake and take
_lock first, thus it was possible for forward() to win the race, and for
a request to arrive to forward a file that had not been placed in local
cache yet.

Here we get rid of SerializedInvoker entirely, as it is partially to
blame for hiding the race: SerializedInvoker can only ensure no two
messages are processed simultaneously, it cannot ensure the messages are
processed in their intended order.

Instead, teach forward() that it may be called before
store_and_forward(), and if that is the case, to place the forward
request on to _waiters alongside any local threads blocked in get().
pull/564/head
David Wilson 5 years ago
parent 1b4eb06f72
commit d4c0250083

@ -603,8 +603,6 @@ class PushFileService(Service):
This service will eventually be merged into FileService.
"""
invoker_class = SerializedInvoker
def __init__(self, **kwargs):
super(PushFileService, self).__init__(**kwargs)
self._lock = threading.Lock()
@ -613,13 +611,16 @@ class PushFileService(Service):
self._sent_by_stream = {}
def get(self, path):
"""
Fetch a file from the cache.
"""
assert isinstance(path, mitogen.core.UnicodeType)
self._lock.acquire()
try:
if path in self._cache:
return self._cache[path]
waiters = self._waiters.setdefault(path, [])
latch = mitogen.core.Latch()
waiters = self._waiters.setdefault(path, [])
waiters.append(lambda: latch.put(None))
finally:
self._lock.release()
@ -633,14 +634,15 @@ class PushFileService(Service):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent and child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
elif path not in sent:
if path in sent:
if child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
else:
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
@ -680,14 +682,6 @@ class PushFileService(Service):
fp.close()
self._forward(context, path)
def _store(self, path, data):
self._lock.acquire()
try:
self._cache[path] = data
return self._waiters.pop(path, [])
finally:
self._lock.release()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
@ -696,9 +690,16 @@ class PushFileService(Service):
'context': mitogen.core.Context,
})
def store_and_forward(self, path, data, context):
LOG.debug('%r.store_and_forward(%r, %r, %r)',
self, path, data, context)
waiters = self._store(path, data)
LOG.debug('%r.store_and_forward(%r, %r, %r) %r',
self, path, data, context,
threading.currentThread().getName())
self._lock.acquire()
try:
self._cache[path] = data
waiters = self._waiters.pop(path, [])
finally:
self._lock.release()
if context.context_id != mitogen.context_id:
self._forward(context, path)
for callback in waiters:
@ -712,10 +713,17 @@ class PushFileService(Service):
})
def forward(self, path, context):
LOG.debug('%r.forward(%r, %r)', self, path, context)
if path not in self._cache:
LOG.error('%r: %r is not in local cache', self, path)
return
self._forward(context, path)
func = lambda: self._forward(context, path)
self._lock.acquire()
try:
if path in self._cache:
func()
else:
LOG.debug('%r: %r not cached yet, queueing', self, path)
self._waiters.setdefault(path, []).append(func)
finally:
self._lock.release()
class FileService(Service):

@ -0,0 +1,56 @@
import os
import tempfile
import unittest2
import mitogen.core
import mitogen.service
import testlib
from mitogen.core import b
def prepare():
# ensure module loading delay is complete before loading PushFileService.
pass
@mitogen.core.takes_router
def wait_for_file(path, router):
pool = mitogen.service.get_or_create_pool(router=router)
service = pool.get_service(u'mitogen.service.PushFileService')
return service.get(path)
class PropagateToTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.service.PushFileService
def test_two_grandchild_one_intermediary(self):
tf = tempfile.NamedTemporaryFile()
path = mitogen.core.to_text(tf.name)
try:
tf.write(b('test'))
tf.flush()
interm = self.router.local(name='interm')
c1 = self.router.local(via=interm, name='c1')
c2 = self.router.local(via=interm)
c1.call(prepare)
c2.call(prepare)
service = self.klass(router=self.router)
service.propagate_to(context=c1, path=path)
service.propagate_to(context=c2, path=path)
s = c1.call(wait_for_file, path=path)
self.assertEquals(b('test'), s)
s = c2.call(wait_for_file, path=path)
self.assertEquals(b('test'), s)
finally:
tf.close()
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save