issue #535: activate Corker on 2.4 in master too.

pull/564/head
David Wilson 6 years ago
parent 509590530b
commit 18b984a0b4

@ -2840,7 +2840,7 @@ class Broker(object):
#: before force-disconnecting them during :meth:`shutdown`. #: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0 shutdown_timeout = 3.0
def __init__(self, poller_class=None): def __init__(self, poller_class=None, activate_compat=True):
self._alive = True self._alive = True
self._exitted = False self._exitted = False
self._waker = Waker(self) self._waker = Waker(self)
@ -2858,6 +2858,19 @@ class Broker(object):
name='mitogen.broker' name='mitogen.broker'
) )
self._thread.start() self._thread.start()
if activate_compat:
self._py24_25_compat()
def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self)
def start_receive(self, stream): def start_receive(self, stream):
""" """
@ -3003,6 +3016,7 @@ class Broker(object):
except Exception: except Exception:
LOG.exception('_broker_main() crashed') LOG.exception('_broker_main() crashed')
self._alive = False # Ensure _alive is consistent on crash.
self._exitted = True self._exitted = True
self._broker_exit() self._broker_exit()
@ -3206,7 +3220,7 @@ class ExternalContext(object):
Router.max_message_size = self.config['max_message_size'] Router.max_message_size = self.config['max_message_size']
if self.config['profiling']: if self.config['profiling']:
enable_profiling() enable_profiling()
self.broker = Broker() self.broker = Broker(activate_compat=False)
self.router = Router(self.broker) self.router = Router(self.broker)
self.router.debug = self.config.get('debug', False) self.router.debug = self.config.get('debug', False)
self.router.undirectional = self.config['unidirectional'] self.router.undirectional = self.config['unidirectional']
@ -3348,17 +3362,6 @@ class ExternalContext(object):
# Reopen with line buffering. # Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1) sys.stdout = os.fdopen(1, 'w', 1)
def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self.broker)
def main(self): def main(self):
self._setup_master() self._setup_master()
try: try:
@ -3386,7 +3389,7 @@ class ExternalContext(object):
socket.gethostname()) socket.gethostname())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable) _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
self._py24_25_compat() self.broker._py24_25_compat()
self.dispatcher.run() self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit') _v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt: except KeyboardInterrupt:

@ -48,21 +48,21 @@ import mitogen.core
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this # List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
# list and mitogen.service registers its Pool too. # list and mitogen.service registers its Pool too.
_brokers = weakref.WeakValueDictionary() _brokers = weakref.WeakKeyDictionary()
_pools = weakref.WeakValueDictionary() _pools = weakref.WeakKeyDictionary()
def _notice_broker_or_pool(obj): def _notice_broker_or_pool(obj):
if isinstance(obj, mitogen.core.Broker): if isinstance(obj, mitogen.core.Broker):
_brokers[id(obj)] = obj _brokers[obj] = True
else: else:
_pools[id(obj)] = obj _pools[obj] = True
def wrap_os__fork(): def wrap_os__fork():
corker = Corker( corker = Corker(
brokers=list(_brokers.values()), brokers=list(_brokers),
pools=list(_pools.values()), pools=list(_pools),
) )
try: try:
corker.cork() corker.cork()

@ -174,6 +174,8 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
expect = '_broker_main() crashed' expect = '_broker_main() crashed'
self.assertTrue(expect in log.stop()) self.assertTrue(expect in log.stop())
self.broker.join()
class AddHandlerTest(testlib.TestCase): class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router klass = mitogen.master.Router

@ -432,6 +432,7 @@ class BrokerMixin(object):
if not self.broker_shutdown: if not self.broker_shutdown:
self.broker.shutdown() self.broker.shutdown()
self.broker.join() self.broker.join()
del self.broker
super(BrokerMixin, self).tearDown() super(BrokerMixin, self).tearDown()
def sync_with_broker(self): def sync_with_broker(self):
@ -445,6 +446,10 @@ class RouterMixin(BrokerMixin):
super(RouterMixin, self).setUp() super(RouterMixin, self).setUp()
self.router = self.router_class(self.broker) self.router = self.router_class(self.broker)
def tearDown(self):
del self.router
super(RouterMixin, self).tearDown()
class DockerMixin(RouterMixin): class DockerMixin(RouterMixin):
@classmethod @classmethod

Loading…
Cancel
Save