From aa06b960f5641903a64668dceb537f9d8b8bf6fc Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 4 Aug 2019 02:33:02 +0100 Subject: [PATCH] parent: define Connection behaviour during Broker.shutdown() - Connection attempt fails reliably, and it fails with CancelledError - Add new mitogen.core.unlisten() - Add test. --- docs/api.rst | 4 ++ docs/signals.rst | 1 + mitogen/core.py | 25 +++++++++--- mitogen/parent.py | 30 +++++++++++++- tests/connection_test.py | 48 +++++++++++++++++++++++ tests/data/broker_shutdown_test_python.py | 9 +++++ 6 files changed, 109 insertions(+), 8 deletions(-) create mode 100644 tests/connection_test.py create mode 100755 tests/data/broker_shutdown_test_python.py diff --git a/docs/api.rst b/docs/api.rst index 917fc627..2557806e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -662,3 +662,7 @@ Exceptions .. autoclass:: LatchError .. autoclass:: StreamError .. autoclass:: TimeoutError + +.. currentmodule:: mitogen.parent +.. autoclass:: EofError +.. autoclass:: CancelledError diff --git a/docs/signals.rst b/docs/signals.rst index 19533bb1..9447e529 100644 --- a/docs/signals.rst +++ b/docs/signals.rst @@ -22,6 +22,7 @@ Functions .. currentmodule:: mitogen.core .. autofunction:: listen +.. autofunction:: unlisten .. autofunction:: fire diff --git a/mitogen/core.py b/mitogen/core.py index 14fddc0f..4dc3e7ef 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -406,22 +406,35 @@ def has_parent_authority(msg, _stream=None): msg.auth_id in mitogen.parent_ids) +def _signals(obj, signal): + return ( + obj.__dict__ + .setdefault('_signals', {}) + .setdefault(signal, []) + ) + + def listen(obj, name, func): """ - Arrange for `func(*args, **kwargs)` to be invoked when the named signal is + Arrange for `func()` to be invoked when signal `name` is fired on `obj`. + """ + _signals(obj, name).append(func) + + +def unlisten(obj, name, func): + """ + Remove `func` from the list of functions invoked when signal `name` is fired by `obj`. """ - signals = vars(obj).setdefault('_signals', {}) - signals.setdefault(name, []).append(func) + _signals(obj, name).remove(func) def fire(obj, name, *args, **kwargs): """ Arrange for `func(*args, **kwargs)` to be invoked for every function - registered for the named signal on `obj`. + registered for signal `name` on `obj`. """ - signals = vars(obj).get('_signals', {}) - for func in signals.get(name, ()): + for func in _signals(obj, name): func(*args, **kwargs) diff --git a/mitogen/parent.py b/mitogen/parent.py index 90926aa4..f8c5d95f 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -91,6 +91,10 @@ try: except ValueError: SC_OPEN_MAX = 1024 +BROKER_SHUTDOWN_MSG = ( + 'Connection cancelled because the associated Broker began to shut down.' +) + OPENPTY_MSG = ( "Failed to create a PTY: %s. It is likely the maximum number of PTYs has " "been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS " @@ -737,13 +741,21 @@ def returncode_to_str(n): class EofError(mitogen.core.StreamError): """ - Raised by :func:`iter_read` and :func:`write_all` when EOF is detected by - the child process. + Raised by :class:`Connection` when an empty read is detected from the + remote process before bootstrap completes. """ # inherits from StreamError to maintain compatibility. pass +class CancelledError(mitogen.core.StreamError): + """ + Raised by :class:`Connection` when :meth:`mitogen.core.Broker.shutdown` is + called before bootstrap completes. + """ + pass + + class Argv(object): """ Wrapper to defer argv formatting when debug logging is disabled. @@ -1427,6 +1439,8 @@ class Connection(object): def _complete_connection(self): self.timer.cancel() if not self.exception: + mitogen.core.unlisten(self._router.broker, 'shutdown', + self._on_broker_shutdown) self._router.register(self.context, self.stdio_stream) self.stdio_stream.set_protocol( MitogenProtocol( @@ -1445,6 +1459,8 @@ class Connection(object): if self.exception is None: self._adorn_eof_error(exc) self.exception = exc + mitogen.core.unlisten(self._router.broker, 'shutdown', + self._on_broker_shutdown) for stream in self.stdio_stream, self.stderr_stream: if stream and not stream.receive_side.closed: stream.on_disconnect(self._router.broker) @@ -1492,6 +1508,13 @@ class Connection(object): )) self.proc._async_reap(self, self._router) + def _on_broker_shutdown(self): + """ + Respond to broker.shutdown() being called by failing the connection + attempt. + """ + self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG)) + def _start_timer(self): self.timer = self._router.broker.timers.schedule( when=self.options.connect_deadline, @@ -1535,6 +1558,9 @@ class Connection(object): return stream def _async_connect(self): + mitogen.core.listen(self._router.broker, 'shutdown', + self._on_broker_shutdown) + self._start_timer() self.stdio_stream = self._setup_stdio_stream() if self.context.name is None: diff --git a/tests/connection_test.py b/tests/connection_test.py new file mode 100644 index 00000000..a66428e4 --- /dev/null +++ b/tests/connection_test.py @@ -0,0 +1,48 @@ + +import time +import tempfile +import sys +import os +import threading + +import unittest2 +import testlib + +import mitogen.core +import mitogen.parent + + +class ConnectionTest(testlib.RouterMixin, testlib.TestCase): + def test_broker_shutdown_while_connect_in_progress(self): + # if Broker.shutdown() is called while a connection attempt is in + # progress, the connection should be torn down. + + path = tempfile.mktemp(prefix='broker_shutdown_sem_') + open(path, 'wb').close() + + os.environ['BROKER_SHUTDOWN_SEMAPHORE'] = path + result = [] + + def thread(): + python_path = testlib.data_path('broker_shutdown_test_python.py') + try: + result.append(self.router.local(python_path=python_path)) + except Exception: + result.append(sys.exc_info()[1]) + + th = threading.Thread(target=thread) + th.start() + + while os.path.exists(path): + time.sleep(0.05) + + self.broker.shutdown() + th.join() + + exc, = result + self.assertTrue(isinstance(exc, mitogen.parent.CancelledError)) + self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0]) + + +if __name__ == '__main__': + unittest2.main() diff --git a/tests/data/broker_shutdown_test_python.py b/tests/data/broker_shutdown_test_python.py new file mode 100755 index 00000000..f1e20c16 --- /dev/null +++ b/tests/data/broker_shutdown_test_python.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# Delete a semaphore file to allow the main thread to wake up, then sleep for +# 30 seconds before starting the real Python. +import os +import time +import sys +os.unlink(os.environ['BROKER_SHUTDOWN_SEMAPHORE']) +time.sleep(30) +os.execl(sys.executable, sys.executable, *sys.argv[1:])