diff --git a/mitogen/core.py b/mitogen/core.py index 4b3df595..607c4bfb 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2840,7 +2840,7 @@ class Broker(object): #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 - def __init__(self, poller_class=None): + def __init__(self, poller_class=None, activate_compat=True): self._alive = True self._exitted = False self._waker = Waker(self) @@ -2858,6 +2858,19 @@ class Broker(object): name='mitogen.broker' ) self._thread.start() + if activate_compat: + self._py24_25_compat() + + def _py24_25_compat(self): + """ + Python 2.4/2.5 have grave difficulties with threads/fork. We + mandatorily quiesce all running threads during fork using a + monkey-patch there. + """ + if sys.version_info < (2, 6): + # import_module() is used to avoid dep scanner. + os_fork = import_module('mitogen.os_fork') + mitogen.os_fork._notice_broker_or_pool(self) def start_receive(self, stream): """ @@ -3003,6 +3016,7 @@ class Broker(object): except Exception: LOG.exception('_broker_main() crashed') + self._alive = False # Ensure _alive is consistent on crash. self._exitted = True self._broker_exit() @@ -3206,7 +3220,7 @@ class ExternalContext(object): Router.max_message_size = self.config['max_message_size'] if self.config['profiling']: enable_profiling() - self.broker = Broker() + self.broker = Broker(activate_compat=False) self.router = Router(self.broker) self.router.debug = self.config.get('debug', False) self.router.undirectional = self.config['unidirectional'] @@ -3348,17 +3362,6 @@ class ExternalContext(object): # Reopen with line buffering. sys.stdout = os.fdopen(1, 'w', 1) - def _py24_25_compat(self): - """ - Python 2.4/2.5 have grave difficulties with threads/fork. We - mandatorily quiesce all running threads during fork using a - monkey-patch there. - """ - if sys.version_info < (2, 6): - # import_module() is used to avoid dep scanner. - os_fork = import_module('mitogen.os_fork') - mitogen.os_fork._notice_broker_or_pool(self.broker) - def main(self): self._setup_master() try: @@ -3386,7 +3389,7 @@ class ExternalContext(object): socket.gethostname()) _v and LOG.debug('Recovered sys.executable: %r', sys.executable) - self._py24_25_compat() + self.broker._py24_25_compat() self.dispatcher.run() _v and LOG.debug('ExternalContext.main() normal exit') except KeyboardInterrupt: diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 2b74bdfc..e6a3aafc 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -48,21 +48,21 @@ import mitogen.core # List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this # list and mitogen.service registers its Pool too. -_brokers = weakref.WeakValueDictionary() -_pools = weakref.WeakValueDictionary() +_brokers = weakref.WeakKeyDictionary() +_pools = weakref.WeakKeyDictionary() def _notice_broker_or_pool(obj): if isinstance(obj, mitogen.core.Broker): - _brokers[id(obj)] = obj + _brokers[obj] = True else: - _pools[id(obj)] = obj + _pools[obj] = True def wrap_os__fork(): corker = Corker( - brokers=list(_brokers.values()), - pools=list(_pools.values()), + brokers=list(_brokers), + pools=list(_pools), ) try: corker.cork() diff --git a/tests/router_test.py b/tests/router_test.py index 4e2c19ed..80169e34 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -174,6 +174,8 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase): expect = '_broker_main() crashed' self.assertTrue(expect in log.stop()) + self.broker.join() + class AddHandlerTest(testlib.TestCase): klass = mitogen.master.Router diff --git a/tests/testlib.py b/tests/testlib.py index 75061b26..37c3c654 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -432,6 +432,7 @@ class BrokerMixin(object): if not self.broker_shutdown: self.broker.shutdown() self.broker.join() + del self.broker super(BrokerMixin, self).tearDown() def sync_with_broker(self): @@ -445,6 +446,10 @@ class RouterMixin(BrokerMixin): super(RouterMixin, self).setUp() self.router = self.router_class(self.broker) + def tearDown(self): + del self.router + super(RouterMixin, self).tearDown() + class DockerMixin(RouterMixin): @classmethod