issue #547: core/service: race/deadlock-free service pool init

The previous method of spinning up a transient thread to import the
service pool in a child context could deadlock with use of the importer
on the main thread. Therefore wake the main thread to handle import for
us, and use a regular Receiver to buffer messages to the stub, which is
inherited rather than replaced by the real service pool.
pull/612/head
David Wilson 5 years ago
parent 50b2d590fd
commit 769a8b2015

@ -122,6 +122,7 @@ LOAD_MODULE = 107
FORWARD_MODULE = 108 FORWARD_MODULE = 108
DETACHING = 109 DETACHING = 109
CALL_SERVICE = 110 CALL_SERVICE = 110
STUB_CALL_SERVICE = 111
#: Special value used to signal disconnection or the inability to route a #: Special value used to signal disconnection or the inability to route a
#: message, when it appears in the `reply_to` field. Usually causes #: message, when it appears in the `reply_to` field. Usually causes
@ -3432,9 +3433,22 @@ class Dispatcher(object):
self.econtext = econtext self.econtext = econtext
#: Chain ID -> CallError if prior call failed. #: Chain ID -> CallError if prior call failed.
self._error_by_chain_id = {} self._error_by_chain_id = {}
self.recv = Receiver(router=econtext.router, self.recv = Receiver(
router=econtext.router,
handle=CALL_FUNCTION, handle=CALL_FUNCTION,
policy=has_parent_authority) policy=has_parent_authority,
)
#: The :data:`CALL_SERVICE` :class:`Receiver` that will eventually be
#: reused by :class:`mitogen.service.Pool`, should it ever be loaded.
#: This is necessary for race-free reception of all service requests
#: delivered regardless of whether the stub or real service pool are
#: loaded. See #547 for related sorrows.
Dispatcher._service_recv = Receiver(
router=econtext.router,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self._service_recv.notify = self._on_call_service
listen(econtext.broker, 'shutdown', self.recv.close) listen(econtext.broker, 'shutdown', self.recv.close)
@classmethod @classmethod
@ -3475,8 +3489,44 @@ class Dispatcher(object):
self._error_by_chain_id[chain_id] = e self._error_by_chain_id[chain_id] = e
return chain_id, e return chain_id, e
def _on_call_service(self, recv):
"""
Notifier for the :data:`CALL_SERVICE` receiver. This is called on the
:class:`Broker` thread for any service messages arriving at this
context, for as long as no real service pool implementation is loaded.
In order to safely bootstrap the service pool implementation a sentinel
message is enqueued on the :data:`CALL_FUNCTION` receiver in order to
wake the main thread, where the importer can run without any
possibility of suffering deadlock due to concurrent uses of the
importer.
Should the main thread be blocked indefinitely, preventing the import
from ever running, if it is blocked waiting on a service call, then it
means :mod:`mitogen.service` has already been imported and
:func:`mitogen.service.get_or_create_pool` has already run, meaning the
service pool is already active and the duplicate initialization was not
needed anyway.
#547: This trickery is needed to avoid the alternate option of spinning
a temporary thread to import the service pool, which could deadlock if
a custom import hook executing on the main thread (under the importer
lock) would block waiting for some data that was in turn received by a
service. Main thread import lock can't be released until service is
running, service cannot satisfy request until import lock is released.
"""
self.recv._on_receive(Message(handle=STUB_CALL_SERVICE))
def _init_service_pool(self):
import mitogen.service
mitogen.service.get_or_create_pool(router=self.econtext.router)
def _dispatch_calls(self): def _dispatch_calls(self):
for msg in self.recv: for msg in self.recv:
if msg.handle == STUB_CALL_SERVICE:
self._init_service_pool()
continue
chain_id, ret = self._dispatch_one(msg) chain_id, ret = self._dispatch_one(msg)
_v and LOG.debug('%r: %r -> %r', self, msg, ret) _v and LOG.debug('%r: %r -> %r', self, msg, ret)
if msg.reply_to: if msg.reply_to:
@ -3535,34 +3585,6 @@ class ExternalContext(object):
if not self.config['profiling']: if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
#: On Python >3.4, the global importer lock has split into per-module
#: locks, so there is no guarantee the import statement in
#: service_stub_main will complete before a second thread attempting the
#: same import will see a partially initialized module. Therefore serialize
#: the stub explicitly.
service_stub_lock = threading.Lock()
def _service_stub_main(self, msg):
self.service_stub_lock.acquire()
try:
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
finally:
self.service_stub_lock.release()
def _on_call_service_msg(self, msg):
"""
Stub service handler. Start a thread to import the mitogen.service
implementation from, and deliver the message to the newly constructed
pool. This must be done as CALL_SERVICE for e.g. PushFileService may
race with a CALL_FUNCTION blocking the main thread waiting for a result
from that service.
"""
if not msg.is_dead:
th = threading.Thread(target=self._service_stub_main, args=(msg,))
th.start()
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
if not msg.is_dead: if not msg.is_dead:
_v and LOG.debug('shutdown request from context %d', msg.src_id) _v and LOG.debug('shutdown request from context %d', msg.src_id)
@ -3606,11 +3628,6 @@ class ExternalContext(object):
handle=SHUTDOWN, handle=SHUTDOWN,
policy=has_parent_authority, policy=has_parent_authority,
) )
self.router.add_handler(
fn=self._on_call_service_msg,
handle=CALL_SERVICE,
policy=has_parent_authority,
)
self.master = Context(self.router, 0, 'master') self.master = Context(self.router, 0, 'master')
parent_id = self.config['parent_ids'][0] parent_id = self.config['parent_ids'][0]
if parent_id == 0: if parent_id == 0:

@ -86,8 +86,13 @@ def get_or_create_pool(size=None, router=None):
_pool_lock.acquire() _pool_lock.acquire()
try: try:
if _pool_pid != my_pid: if _pool_pid != my_pid:
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE, _pool = Pool(
overwrite=True) router,
services=[],
size=size or DEFAULT_POOL_SIZE,
overwrite=True,
recv=mitogen.core.Dispatcher._service_recv,
)
# In case of Broker shutdown crash, Pool can cause 'zombie' # In case of Broker shutdown crash, Pool can cause 'zombie'
# processes. # processes.
mitogen.core.listen(router.broker, 'shutdown', mitogen.core.listen(router.broker, 'shutdown',
@ -475,22 +480,31 @@ class Pool(object):
program's configuration or its input data. program's configuration or its input data.
:param mitogen.core.Router router: :param mitogen.core.Router router:
Router to listen for ``CALL_SERVICE`` messages on. :class:`mitogen.core.Router` to listen for
:data:`mitogen.core.CALL_SERVICE` messages.
:param list services: :param list services:
Initial list of services to register. Initial list of services to register.
:param mitogen.core.Receiver recv:
:data:`mitogen.core.CALL_SERVICE` receiver to reuse. This is used by
:func:`get_or_create_pool` to hand off a queue of messages from the
Dispatcher stub handler while avoiding a race.
""" """
activator_class = Activator activator_class = Activator
def __init__(self, router, services=(), size=1, overwrite=False): def __init__(self, router, services=(), size=1, overwrite=False,
recv=None):
self.router = router self.router = router
self._activator = self.activator_class() self._activator = self.activator_class()
self._ipc_latch = mitogen.core.Latch() self._ipc_latch = mitogen.core.Latch()
self._receiver = mitogen.core.Receiver( self._receiver = recv or mitogen.core.Receiver(
router=router, router=router,
handle=mitogen.core.CALL_SERVICE, handle=mitogen.core.CALL_SERVICE,
overwrite=overwrite, overwrite=overwrite,
) )
# If self._receiver was inherited from mitogen.core.Dispatcher, we must
# remove its stub notification function before adding it to our Select.
self._receiver.notify = None
self._select = mitogen.select.Select(oneshot=False) self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver) self._select.add(self._receiver)
self._select.add(self._ipc_latch) self._select.add(self._ipc_latch)

Loading…
Cancel
Save