parent: define Connection behaviour during Broker.shutdown()

- Connection attempt fails reliably, and it fails with CancelledError
- Add new mitogen.core.unlisten()
- Add test.
pull/612/head
David Wilson 6 years ago
parent edde251d58
commit aa06b960f5

@ -662,3 +662,7 @@ Exceptions
.. autoclass:: LatchError .. autoclass:: LatchError
.. autoclass:: StreamError .. autoclass:: StreamError
.. autoclass:: TimeoutError .. autoclass:: TimeoutError
.. currentmodule:: mitogen.parent
.. autoclass:: EofError
.. autoclass:: CancelledError

@ -22,6 +22,7 @@ Functions
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autofunction:: listen .. autofunction:: listen
.. autofunction:: unlisten
.. autofunction:: fire .. autofunction:: fire

@ -406,22 +406,35 @@ def has_parent_authority(msg, _stream=None):
msg.auth_id in mitogen.parent_ids) msg.auth_id in mitogen.parent_ids)
def _signals(obj, signal):
return (
obj.__dict__
.setdefault('_signals', {})
.setdefault(signal, [])
)
def listen(obj, name, func): def listen(obj, name, func):
""" """
Arrange for `func(*args, **kwargs)` to be invoked when the named signal is Arrange for `func()` to be invoked when signal `name` is fired on `obj`.
"""
_signals(obj, name).append(func)
def unlisten(obj, name, func):
"""
Remove `func` from the list of functions invoked when signal `name` is
fired by `obj`. fired by `obj`.
""" """
signals = vars(obj).setdefault('_signals', {}) _signals(obj, name).remove(func)
signals.setdefault(name, []).append(func)
def fire(obj, name, *args, **kwargs): def fire(obj, name, *args, **kwargs):
""" """
Arrange for `func(*args, **kwargs)` to be invoked for every function Arrange for `func(*args, **kwargs)` to be invoked for every function
registered for the named signal on `obj`. registered for signal `name` on `obj`.
""" """
signals = vars(obj).get('_signals', {}) for func in _signals(obj, name):
for func in signals.get(name, ()):
func(*args, **kwargs) func(*args, **kwargs)

@ -91,6 +91,10 @@ try:
except ValueError: except ValueError:
SC_OPEN_MAX = 1024 SC_OPEN_MAX = 1024
BROKER_SHUTDOWN_MSG = (
'Connection cancelled because the associated Broker began to shut down.'
)
OPENPTY_MSG = ( OPENPTY_MSG = (
"Failed to create a PTY: %s. It is likely the maximum number of PTYs has " "Failed to create a PTY: %s. It is likely the maximum number of PTYs has "
"been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS " "been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS "
@ -737,13 +741,21 @@ def returncode_to_str(n):
class EofError(mitogen.core.StreamError): class EofError(mitogen.core.StreamError):
""" """
Raised by :func:`iter_read` and :func:`write_all` when EOF is detected by Raised by :class:`Connection` when an empty read is detected from the
the child process. remote process before bootstrap completes.
""" """
# inherits from StreamError to maintain compatibility. # inherits from StreamError to maintain compatibility.
pass pass
class CancelledError(mitogen.core.StreamError):
"""
Raised by :class:`Connection` when :meth:`mitogen.core.Broker.shutdown` is
called before bootstrap completes.
"""
pass
class Argv(object): class Argv(object):
""" """
Wrapper to defer argv formatting when debug logging is disabled. Wrapper to defer argv formatting when debug logging is disabled.
@ -1427,6 +1439,8 @@ class Connection(object):
def _complete_connection(self): def _complete_connection(self):
self.timer.cancel() self.timer.cancel()
if not self.exception: if not self.exception:
mitogen.core.unlisten(self._router.broker, 'shutdown',
self._on_broker_shutdown)
self._router.register(self.context, self.stdio_stream) self._router.register(self.context, self.stdio_stream)
self.stdio_stream.set_protocol( self.stdio_stream.set_protocol(
MitogenProtocol( MitogenProtocol(
@ -1445,6 +1459,8 @@ class Connection(object):
if self.exception is None: if self.exception is None:
self._adorn_eof_error(exc) self._adorn_eof_error(exc)
self.exception = exc self.exception = exc
mitogen.core.unlisten(self._router.broker, 'shutdown',
self._on_broker_shutdown)
for stream in self.stdio_stream, self.stderr_stream: for stream in self.stdio_stream, self.stderr_stream:
if stream and not stream.receive_side.closed: if stream and not stream.receive_side.closed:
stream.on_disconnect(self._router.broker) stream.on_disconnect(self._router.broker)
@ -1492,6 +1508,13 @@ class Connection(object):
)) ))
self.proc._async_reap(self, self._router) self.proc._async_reap(self, self._router)
def _on_broker_shutdown(self):
"""
Respond to broker.shutdown() being called by failing the connection
attempt.
"""
self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG))
def _start_timer(self): def _start_timer(self):
self.timer = self._router.broker.timers.schedule( self.timer = self._router.broker.timers.schedule(
when=self.options.connect_deadline, when=self.options.connect_deadline,
@ -1535,6 +1558,9 @@ class Connection(object):
return stream return stream
def _async_connect(self): def _async_connect(self):
mitogen.core.listen(self._router.broker, 'shutdown',
self._on_broker_shutdown)
self._start_timer() self._start_timer()
self.stdio_stream = self._setup_stdio_stream() self.stdio_stream = self._setup_stdio_stream()
if self.context.name is None: if self.context.name is None:

@ -0,0 +1,48 @@
import time
import tempfile
import sys
import os
import threading
import unittest2
import testlib
import mitogen.core
import mitogen.parent
class ConnectionTest(testlib.RouterMixin, testlib.TestCase):
def test_broker_shutdown_while_connect_in_progress(self):
# if Broker.shutdown() is called while a connection attempt is in
# progress, the connection should be torn down.
path = tempfile.mktemp(prefix='broker_shutdown_sem_')
open(path, 'wb').close()
os.environ['BROKER_SHUTDOWN_SEMAPHORE'] = path
result = []
def thread():
python_path = testlib.data_path('broker_shutdown_test_python.py')
try:
result.append(self.router.local(python_path=python_path))
except Exception:
result.append(sys.exc_info()[1])
th = threading.Thread(target=thread)
th.start()
while os.path.exists(path):
time.sleep(0.05)
self.broker.shutdown()
th.join()
exc, = result
self.assertTrue(isinstance(exc, mitogen.parent.CancelledError))
self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0])
if __name__ == '__main__':
unittest2.main()

@ -0,0 +1,9 @@
#!/usr/bin/env python
# Delete a semaphore file to allow the main thread to wake up, then sleep for
# 30 seconds before starting the real Python.
import os
import time
import sys
os.unlink(os.environ['BROKER_SHUTDOWN_SEMAPHORE'])
time.sleep(30)
os.execl(sys.executable, sys.executable, *sys.argv[1:])
Loading…
Cancel
Save