service: simplify CALL_SERVICE stub and fix race.

If PushService.store_and_forward() loses the race to arrive at a brand
new context first, and the context's main thread is already executing a
CALL_FUNCTION that is blocked on the result of PushService, deadlock
could occur in the old scheme.

Instead (for now) simply spam a thread for each incoming message, and
use the get_or_create_pool() lock to ensure things work out in the end.
This could potentially generate a huge number of threads given the wrong
app, but we'll fix that problem when it appears.
pull/262/head
David Wilson 6 years ago
parent 92ecf29559
commit 05e0b134f9

@ -75,10 +75,6 @@ IOLOG.setLevel(logging.INFO)
_v = False _v = False
_vv = False _vv = False
# Also taken by Broker, no blocking work can occur with it held.
_service_call_lock = threading.Lock()
_service_calls = []
GET_MODULE = 100 GET_MODULE = 100
CALL_FUNCTION = 101 CALL_FUNCTION = 101
FORWARD_LOG = 102 FORWARD_LOG = 102
@ -1648,27 +1644,22 @@ class ExternalContext(object):
if not self.config['profiling']: if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def _service_stub_main(self, msg):
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
def _on_call_service_msg(self, msg): def _on_call_service_msg(self, msg):
""" """
Stub CALL_SERVICE handler, push message on temporary queue and invoke Stub service handler. Start a thread to import the mitogen.service
_on_stub_call() from the main thread. implementation from, and deliver the message to the newly constructed
pool. This must be done as CALL_SERVICE for e.g. PushFileService may
race with a CALL_FUNCTION blocking the main thread waiting for a result
from that service.
""" """
if msg.is_dead: if not msg.is_dead:
return th = threading.Thread(target=self._service_stub_main, args=(msg,))
_service_call_lock.acquire() th.start()
try:
_service_calls.append(msg)
finally:
_service_call_lock.release()
self.router.route(
Message.pickled(
dst_id=mitogen.context_id,
handle=CALL_FUNCTION,
obj=('mitogen.service', None, '_on_stub_call', (), {}),
router=self.router,
)
)
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg) _v and LOG.debug('_on_shutdown_msg(%r)', msg)

@ -58,26 +58,6 @@ def get_or_create_pool(size=None, router=None):
_pool_lock.release() _pool_lock.release()
@mitogen.core.takes_router
def _on_stub_call(router):
"""
Called for each message received by the core.py stub CALL_SERVICE handler.
Create the pool if it doesn't already exist, and push enqueued messages
into the pool's receiver. This may be called more than once as the stub
service handler runs in asynchronous context, while _on_stub_call() happens
on the main thread. Multiple CALL_SERVICE may end up enqueued before Pool
has a chance to install the real CALL_SERVICE handler.
"""
pool = get_or_create_pool(router=router)
mitogen.core._service_call_lock.acquire()
try:
for msg in mitogen.core._service_calls:
pool._receiver._on_receive(msg)
del mitogen.core._service_calls[:]
finally:
mitogen.core._service_call_lock.release()
def validate_arg_spec(spec, args): def validate_arg_spec(spec, args):
for name in spec: for name in spec:
try: try:
@ -250,7 +230,8 @@ class Invoker(object):
except Exception: except Exception:
if no_reply: if no_reply:
LOG.exception('While calling no-reply method %s.%s', LOG.exception('While calling no-reply method %s.%s',
type(self).__name__, method.func_name) type(self.service).__name__,
method.func_name)
else: else:
raise raise

Loading…
Cancel
Save