From fea12a60302fe1e8c331434422f598b6018fa05f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 16 Aug 2016 01:57:08 +0100 Subject: [PATCH] Many updates: graceful shutdown, docs, threading * Start splitting docs up into internals.rst / api.rst * Docs for lots more of econtext.core. * Get rid of _update_stream() and has_output(), replace with individual functions called as state changes. * Add Broker.on_thread() and remove Stream._lock: simply call on_thread() to ensure buffer management is linearized. * Rename read_side/write_side to receive_side/transmit_side like event handler names. * Clean up some more repr / debug logs. * Move handle cleanup to Context.on_shutdown where it belongs. * Make wake() a noop when called from broker thread. * Replace graceful_count crap with Side.graceful attribute, add Broker.keep_alive() to check whether any registered readers want to be kept alive for graceful shutdown() or any child contexts with a connected stream exist. * Make master.Broker timeout slightly longer than slave broker. * Add generic on_thread() to allow running code on the IO thread. --- docs/api.rst | 38 +++-- docs/history.rst | 18 +-- docs/howitworks.rst | 2 + docs/index.rst | 1 + docs/internals.rst | 50 +++++++ econtext/core.py | 337 +++++++++++++++++++++++++++----------------- econtext/master.py | 21 ++- 7 files changed, 299 insertions(+), 168 deletions(-) create mode 100644 docs/internals.rst diff --git a/docs/api.rst b/docs/api.rst index e0ff7ac5..b412a098 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -27,6 +27,20 @@ Exceptions .. autoclass:: econtext.core.TimeoutError +Stream Classes +-------------- + +.. autoclass:: econtext.core.Stream + :members: + + +Broker Class +------------ + +.. autoclass:: econtext.core.Broker + :members: + + Context Class ------------- @@ -84,21 +98,6 @@ econtext.master .. automodule:: econtext.master -Helper Functions ----------------- - -.. autofunction:: econtext.master.create_child -.. autofunction:: econtext.master.get_child_modules -.. autofunction:: econtext.master.minimize_source - - -Context Class -------------- - -.. autoclass:: econtext.master.Context - :members: - - Broker Class ------------ @@ -106,13 +105,10 @@ Broker Class :members: -Stream Classes --------------- - -.. autoclass:: econtext.master.LocalStream - :members: +Context Class +------------- -.. autoclass:: econtext.master.SshStream +.. autoclass:: econtext.master.Context :members: diff --git a/docs/history.rst b/docs/history.rst index 1f1c21e3..1f44ddf8 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -8,17 +8,17 @@ History The first version of econtext was written in late 2006 for use in an infrastructure management program, however at the time I lacked the pragmatism -necessary for pushing my little design from concept to a working -implementation. I tired of it when no way could be found to combine every -communication style (*blocking execute function, asynchronous execute function, -proxy slave-of-slave context*) into one neat abstraction. That unification -still has not happened, but I'm no longer as burdened by it. +necessary for pushing my little design from concept to finished implementation. +I tired of it when no way could be found to unify every communication style +(*blocking execute function, asynchronous execute function, proxy +slave-of-slave context*) into one neat abstraction. That unification never +happened, but I'm no longer worried by it. Every few years I would pick through the source code, especially after periods -of working commercially with some contemporary infrastructure management -systems, none of which had anything close to as neat an approach to running -Python code on remote machines, and suffered from shockingly beginner-level -bugs such as failing to report SSH diagnostic messages. +of commercial work involving some contemporary infrastructure management +systems, none of which had nearly as neat an approach to running Python code +remotely, and suffered from shockingly beginner-level bugs such as failing to +report SSH diagnostic messages. And every few years I'd put that code down again, especially since moving to an OS X laptop where :py:func:`select.poll` was not available, the struggle to get diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 5f1601d4..db4aafed 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -216,6 +216,8 @@ disconnection by the master will cause the IO multiplexer thread to enter shutdown by itself. +.. _stream-protocol: + Stream Protocol --------------- diff --git a/docs/index.rst b/docs/index.rst index 83d68d4f..04a8c921 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -11,6 +11,7 @@ Python Execution Contexts howitworks getting_started api + internals history diff --git a/docs/internals.rst b/docs/internals.rst new file mode 100644 index 00000000..ef81a942 --- /dev/null +++ b/docs/internals.rst @@ -0,0 +1,50 @@ + +Internal API Reference +********************** + + +econtext.core +============= + + +Side Class +---------- + +.. autoclass:: econtext.core.Side + :members: + + +Stream Classes +-------------- + +.. class:: foo + +.. class:: foo + + +:py:class:`foo` + +.. autoclass:: econtext.core.BasicStream + :members: + + +econtext.master +=============== + + +Helper Functions +---------------- + +.. autofunction:: econtext.master.create_child +.. autofunction:: econtext.master.get_child_modules +.. autofunction:: econtext.master.minimize_source + + +Stream Classes +-------------- + +.. autoclass:: econtext.master.LocalStream + :members: + +.. autoclass:: econtext.master.SshStream + :members: diff --git a/econtext/core.py b/econtext/core.py index 93b270b4..5050d0bc 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -36,13 +36,16 @@ FORWARD_LOG = 102 class Error(Exception): - """Raised when a problem occurs with a context.""" + """Base for all exceptions raised by this module.""" def __init__(self, fmt, *args): Exception.__init__(self, fmt % args) class CallError(Error): - """Raised when .call() fails.""" + """Raised when :py:meth:`Context.call() ` + fails. A copy of the traceback from the external context is appended to the + exception message. + """ def __init__(self, e): name = '%s.%s' % (type(e).__module__, type(e).__name__) tb = sys.exc_info()[2] @@ -73,7 +76,7 @@ class Dead(object): return '' -#: Sentinel value used to represent Channel disconnection. +#: Sentinel value used to represent :py:class:`Channel` disconnection. _DEAD = Dead() @@ -221,19 +224,34 @@ class LogHandler(logging.Handler): class Side(object): - def __init__(self, stream, fd): + """ + Represent a single side of a :py:class:`BasicStream`. This exists to allow + streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional + (e.g. UNIX socket) file descriptors to operate identically. + """ + def __init__(self, stream, fd, keep_alive=False): + #: The :py:class:`Stream` for which this is a read or write side. self.stream = stream + #: Integer file descriptor to perform IO on. self.fd = fd + #: If ``True``, causes presence of this side in :py:class:`Broker`'s + #: active reader set to defer shutdown until the side is disconnected. + self.keep_alive = keep_alive def __repr__(self): return '' % (self.stream, self.fd) def fileno(self): + """Return :py:attr:`fd` if it is not ``None``, otherwise raise + ``StreamError``. This method is implemented so that :py:class:`Side` + can be used directly by :py:func:`select.select`.""" if self.fd is None: raise StreamError('%r.fileno() called but no FD set', self) return self.fd def close(self): + """Call :py:func:`os.close` on :py:attr:`fd` if it is not ``None``, + then set it to ``None``.""" if self.fd is not None: IOLOG.debug('%r.close()', self) os.close(self.fd) @@ -241,36 +259,70 @@ class Side(object): class BasicStream(object): - read_side = None - write_side = None + """ + + .. method:: on_disconnect (broker) + + Called by :py:class:`Broker` to force disconnect the stream. The base + implementation simply closes :py:attr:`receive_side` and + :py:attr:`transmit_side` and unregisters the stream from the broker. + + .. method:: on_receive (broker) + + Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has + been marked readable using :py:meth:`Broker.start_receive` and the + broker has detected the associated file descriptor is ready for + reading. + + Subclasses must implement this method if + :py:meth:`Broker.start_receive` is ever called on them, and the method + must call :py:meth:`on_disconect` if reading produces an empty string. + + .. method:: on_transmit (broker) + + Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` + has been marked writeable using :py:meth:`Broker.start_transmit` and + the broker has detected the associated file descriptor is ready for + writing. - def on_disconnect(self): - """Close our associated descriptors.""" + Subclasses must implement this method if + :py:meth:`Broker.start_transmit` is ever called on them. + + .. method:: on_shutdown (broker) + + Called by :py:meth:`Broker.shutdown` to allow the stream time to + gracefully shutdown. The base implementation simply called + :py:meth:`on_disconnect`. + + """ + #: A :py:class:`Side` representing the stream's receive file descriptor. + receive_side = None + + #: A :py:class:`Side` representing the stream's transmit file descriptor. + transmit_side = None + + def on_disconnect(self, broker): LOG.debug('%r.on_disconnect()', self) - self.read_side.close() - self.write_side.close() + broker.stop_receive(self) + broker.stop_transmit(self) + self.receive_side.close() + self.transmit_side.close() - def on_shutdown(self): - """Disconnect gracefully. Base implementation calls on_disconnect().""" + def on_shutdown(self, broker): LOG.debug('%r.on_shutdown()', self) - self.on_disconnect() - - def has_output(self): - return False + self.on_disconnect(broker) class Stream(BasicStream): """ - Initialize a new Stream instance. - - :param context: Context to communicate with. + :py:class:`BasicStream` subclass implementing econtext's :ref:`stream + protocol `. """ _input_buf = '' _output_buf = '' def __init__(self, context): self._context = context - self._lock = threading.Lock() self._rhmac = hmac.new(context.key, digestmod=sha) self._whmac = self._rhmac.copy() @@ -285,18 +337,18 @@ class Stream(BasicStream): unpickler.find_global = self._find_global return unpickler.load() - def on_receive(self): + def on_receive(self, broker): """Handle the next complete message on the stream. Raise - StreamError or IOError on failure.""" + :py:class:`StreamError` on failure.""" IOLOG.debug('%r.on_receive()', self) - buf = os.read(self.read_side.fd, 4096) + buf = os.read(self.receive_side.fd, 4096) self._input_buf += buf while self._receive_one(): pass if not buf: - return self.on_disconnect() + return self.on_disconnect(broker) def _receive_one(self): if len(self._input_buf) < 24: @@ -340,51 +392,41 @@ class Stream(BasicStream): except Exception: LOG.debug('%r._invoke(%r, %r): %r crashed', self, handle, data, fn) - def on_transmit(self): + def on_transmit(self, broker): """Transmit buffered messages.""" IOLOG.debug('%r.on_transmit()', self) - written = os.write(self.write_side.fd, self._output_buf[:4096]) - self._lock.acquire() - try: - self._output_buf = self._output_buf[written:] - finally: - self._lock.release() - if (not self._output_buf) and not self._context.broker.graceful_count: - self.on_disconnect() - - def has_output(self): - return bool(self._output_buf) + written = os.write(self.transmit_side.fd, self._output_buf[:4096]) + self._output_buf = self._output_buf[written:] + if not self._output_buf: + broker.stop_transmit(self) - def enqueue(self, handle, obj): - """Enqueue `obj` to `handle`, and tell the broker we have output.""" - IOLOG.debug('%r.enqueue(%r, %r)', self, handle, obj) + def _enqueue(self, handle, obj): + IOLOG.debug('%r._enqueue(%r, %r)', self, handle, obj) encoded = cPickle.dumps((handle, obj), protocol=2) msg = struct.pack('>L', len(encoded)) + encoded - self._lock.acquire() - try: - self._whmac.update(msg) - self._output_buf += self._whmac.digest() + msg - finally: - self._lock.release() - self._context.broker.update_stream(self) + self._whmac.update(msg) + self._output_buf += self._whmac.digest() + msg + self._context.broker.start_transmit(self) - def on_disconnect(self): - super(Stream, self).on_disconnect() - if self._context.stream is self: - self._context.on_disconnect() + def enqueue(self, handle, obj): + """Enqueue `obj` to `handle`, and tell the broker we have output. May + be called from any thread.""" + self._context.broker.on_thread(self._enqueue, handle, obj) - for handle, (persist, fn) in self._context._handle_map.iteritems(): - LOG.debug('%r.on_disconnect(): killing %r: %r', self, handle, fn) - fn(_DEAD) + def on_disconnect(self, broker): + super(Stream, self).on_disconnect(broker) + if self._context.stream is self: + self._context.on_disconnect(broker) - def on_shutdown(self): + def on_shutdown(self, broker): """Override BasicStream behaviour of immediately disconnecting.""" + LOG.debug('%r.on_shutdown(%r)', self, broker) def accept(self, rfd, wfd): - self.read_side = Side(self, os.dup(rfd)) - self.write_side = Side(self, os.dup(wfd)) - set_cloexec(self.read_side.fd) - set_cloexec(self.write_side.fd) + self.receive_side = Side(self, os.dup(rfd)) + self.transmit_side = Side(self, os.dup(wfd)) + set_cloexec(self.receive_side.fd) + set_cloexec(self.transmit_side.fd) self._context.stream = self def connect(self): @@ -392,13 +434,13 @@ class Stream(BasicStream): Context.""" LOG.debug('%r.connect()', self) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.read_side = Side(self, sock.fileno()) - self.write_side = Side(self, sock.fileno()) + self.receive_side = Side(self, sock.fileno()) + self.transmit_side = Side(self, sock.fileno()) sock.connect(self._context.parent_addr) self.enqueue(0, self._context.name) def __repr__(self): - return '%s()' % (self.__class__.__name__, self._context) + return '%s(%r)' % (self.__class__.__name__, self._context) class Context(object): @@ -419,13 +461,17 @@ class Context(object): self._last_handle = itertools.count(1000) self._handle_map = {} - def on_shutdown(self): - """Slave does nothing, _broker_main() will shutdown its streams.""" + def on_shutdown(self, broker): + """Called during :py:meth:`Broker.shutdown`, informs callbacks + registered with :py:meth:`add_handle_cb` the connection is dead.""" + for handle, (persist, fn) in self._handle_map.iteritems(): + LOG.debug('%r.on_disconnect(): killing %r: %r', self, handle, fn) + fn(_DEAD) - def on_disconnect(self): + def on_disconnect(self, broker): self.stream = None LOG.debug('Parent stream is gone, dying.') - self.broker.shutdown() + broker.shutdown() def alloc_handle(self): """Allocate a handle.""" @@ -456,7 +502,7 @@ class Context(object): try: data = queue.get(True, deadline) except Queue.Empty: - self.stream.on_disconnect() + self.broker.on_thread(self.stream.on_disconnect, self.broker) raise TimeoutError('deadline exceeded.') if data == _DEAD: @@ -476,19 +522,20 @@ class Waker(BasicStream): rfd, wfd = os.pipe() set_cloexec(rfd) set_cloexec(wfd) - self.read_side = Side(self, rfd) - self.write_side = Side(self, wfd) - broker.update_stream(self) + self.receive_side = Side(self, rfd) + self.transmit_side = Side(self, wfd) + broker.start_receive(self) def __repr__(self): return '' def wake(self): - if self.write_side.fd: - os.write(self.write_side.fd, ' ') + if threading.currentThread() != self._broker._thread and \ + self.transmit_side.fd: + os.write(self.transmit_side.fd, ' ') - def on_receive(self): - os.read(self.read_side.fd, 1) + def on_receive(self, broker): + os.read(self.receive_side.fd, 1) class IoLogger(BasicStream): @@ -498,37 +545,35 @@ class IoLogger(BasicStream): self._broker = broker self._name = name self._log = logging.getLogger(name) - self._rsock, self._wsock = socket.socketpair() + self._rsock, self._wsock = socket.socketpair() os.dup2(self._wsock.fileno(), dest_fd) set_cloexec(self._rsock.fileno()) set_cloexec(self._wsock.fileno()) - self.read_side = Side(self, self._rsock.fileno()) - self.write_side = Side(self, dest_fd) - broker.graceful_count += 1 - self._broker.update_stream(self) + self.receive_side = Side(self, self._rsock.fileno(), keep_alive=True) + self.transmit_side = Side(self, dest_fd) + self._broker.start_receive(self) def __repr__(self): - return '' % (self._name, self.read_side.fd) + return '' % (self._name,) def _log_lines(self): while self._buf.find('\n') != -1: line, _, self._buf = self._buf.partition('\n') self._log.info('%s', line.rstrip('\n')) - def on_shutdown(self): + def on_shutdown(self, broker): LOG.debug('%r.on_shutdown()', self) self._wsock.shutdown(socket.SHUT_WR) self._wsock.close() + self.transmit_side.close() - def on_receive(self): + def on_receive(self, broker): LOG.debug('%r.on_receive()', self) - buf = os.read(self.read_side.fd, 4096) + buf = os.read(self.receive_side.fd, 4096) if not buf: - LOG.debug('%r decrement graceful_count', self) - self._broker.graceful_count -= 1 - return self.on_disconnect() + return self.on_disconnect(broker) self._buf += buf self._log_lines() @@ -536,15 +581,19 @@ class IoLogger(BasicStream): class Broker(object): """ - Broker: responsible for tracking contexts, associated streams, and I/O + Responsible for tracking contexts, their associated streams and I/O multiplexing. """ _waker = None - graceful_count = 0 - graceful_timeout = 3.0 + _thread = None + + #: Seconds grace to allow :py:class:`Streams ` to shutdown + #: gracefully before force-disconnecting them during :py:meth:`shutdown`. + shutdown_timeout = 3.0 def __init__(self): self._alive = True + self._queue = Queue.Queue() self._contexts = {} self._readers = set() self._writers = set() @@ -553,77 +602,110 @@ class Broker(object): name='econtext-broker') self._thread.start() - def _update_stream(self, stream): - IOLOG.debug('_update_stream(%r)', stream) - if stream.read_side.fd is not None: - self._readers.add(stream.read_side) - else: - self._readers.discard(stream.read_side) - - if stream.write_side.fd is not None and stream.has_output(): - self._writers.add(stream.write_side) + def on_thread(self, func, *args, **kwargs): + if threading.currentThread() == self._thread: + func(*args, **kwargs) else: - self._writers.discard(stream.write_side) - - def update_stream(self, stream): - self._update_stream(stream) - if self._waker: - self._waker.wake() + self._queue.put((func, args, kwargs)) + if self._waker: + self._waker.wake() + + def start_receive(self, stream): + """Mark the :py:attr:`receive_side ` on `stream` as + ready for reading. May be called from any thread. When the associated + file descriptor becomes ready for reading, + :py:meth:`BasicStream.on_transmit` will be called.""" + IOLOG.debug('%r.start_receive(%r)', self, stream) + self.on_thread(self._readers.add, stream.receive_side) + + def stop_receive(self, stream): + IOLOG.debug('%r.stop_receive(%r)', self, stream) + self.on_thread(self._readers.discard, stream.receive_side) + + def start_transmit(self, stream): + IOLOG.debug('%r.start_transmit(%r)', self, stream) + self.on_thread(self._writers.add, stream.transmit_side) + + def stop_transmit(self, stream): + IOLOG.debug('%r.stop_transmit(%r)', self, stream) + self.on_thread(self._writers.discard, stream.transmit_side) def register(self, context): - """Put a context under control of this broker.""" + """Register `context` with this broker. Registration simply calls + :py:meth:`start_receive` on the context's :py:class:`Stream`, and records + a reference to it so that :py:meth:`Context.on_shutdown` can be + called during :py:meth:`shutdown`.""" LOG.debug('%r.register(%r) -> r=%r w=%r', self, context, - context.stream.read_side, - context.stream.write_side) - self.update_stream(context.stream) + context.stream.receive_side, + context.stream.transmit_side) + self.start_receive(context.stream) self._contexts[context.name] = context return context - def _call_and_update(self, stream, func): + def _call(self, stream, func): try: - func() + func(self) except Exception: LOG.exception('%r crashed', stream) - stream.on_disconnect() - self._update_stream(stream) + stream.on_disconnect(self) def _loop_once(self, timeout=None): IOLOG.debug('%r._loop_once(%r)', self, timeout) - #IOLOG.debug('readers = %r', [(r.fileno(), r) for r in self._readers]) - #IOLOG.debug('writers = %r', [(w.fileno(), w) for w in self._writers]) + + while not self._queue.empty(): + func, args, kwargs = self._queue.get() + func(*args, **kwargs) + + #IOLOG.debug('readers = %r', self._readers) + #IOLOG.debug('writers = %r', self._writers) rsides, wsides, _ = select.select(self._readers, self._writers, (), timeout) for side in rsides: IOLOG.debug('%r: POLLIN for %r', self, side.stream) - self._call_and_update(side.stream, side.stream.on_receive) + self._call(side.stream, side.stream.on_receive) for side in wsides: IOLOG.debug('%r: POLLOUT for %r', self, side.stream) - self._call_and_update(side.stream, side.stream.on_transmit) + self._call(side.stream, side.stream.on_transmit) + + def keep_alive(self): + """Return ``True`` if any reader's :py:attr:`Side.keep_alive` + attribute is ``True``, or any :py:class:`Context` is still registered + that is not the master. Used to delay shutdown while some important + work is in progress (e.g. log draining).""" + return any(c.stream and c.name != 'master' + for c in self._contexts.itervalues()) or \ + any(side.keep_alive for side in self._readers) def _broker_main(self): - """Handle events until shutdown().""" + """Handle events until :py:meth:`shutdown`. On shutdown, invoke + :py:meth:`Stream.on_shutdown` for every active stream, then allow up to + :py:attr:`shutdown_timeout` seconds for the streams to unregister + themselves before forcefully calling + :py:meth:`Stream.on_disconnect`.""" try: while self._alive: self._loop_once() for side in self._readers | self._writers: - self._call_and_update(side.stream, side.stream.on_shutdown) + self._call(side.stream, side.stream.on_shutdown) + + deadline = time.time() + self.shutdown_timeout + while self.keep_alive() and time.time() < deadline: + self._loop_once(max(0, deadline - time.time())) - deadline = time.time() + self.graceful_timeout - while ((self._readers or self._writers) and - (self.graceful_count or time.time() < deadline)): - self._loop_once(1.0) + if self.keep_alive(): + LOG.error('%r: some streams did not close gracefully. ' + 'The most likely cause for this is one or ' + 'more child processes still connected to ' + 'ou stdout/stderr pipes.', self) for context in self._contexts.itervalues(): - stream = context.stream - if stream: - stream.on_disconnect() - self._update_stream(stream) + context.on_shutdown(self) for side in self._readers | self._writers: LOG.error('_broker_main() force disconnecting %r', side) - side.stream.on_disconnect() + side.stream.on_disconnect(self) except Exception: LOG.exception('_broker_main() crashed') @@ -634,7 +716,8 @@ class Broker(object): self._waker.wake() def join(self): - """Wait for the broker to stop.""" + """Wait for the broker to stop, expected to be called after + :py:meth:`shutdown`.""" self._thread.join() def __repr__(self): diff --git a/econtext/master.py b/econtext/master.py index 77bd1b0e..89de709c 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -70,10 +70,10 @@ class Listener(econtext.core.BasicStream): self._sock.listen(backlog) econtext.core.set_cloexec(self._sock.fileno()) self._listen_addr = self._sock.getsockname() - self.read_side = econtext.core.Side(self, self._sock.fileno()) + self.receive_side = econtext.core.Side(self, self._sock.fileno()) broker.update_stream(self) - def on_receive(self): + def on_receive(self, broker): sock, addr = self._sock.accept() context = Context(self._broker, name=addr) stream = econtext.core.Stream(context) @@ -144,7 +144,7 @@ class LocalStream(econtext.core.Stream): super(LocalStream, self).__init__(context) self._permitted_classes = set([('econtext.core', 'CallError')]) - def on_shutdown(self): + def on_shutdown(self, broker): """Request the slave gracefully shut itself down.""" LOG.debug('%r closing CALL_FUNCTION channel', self) self.enqueue(econtext.core.CALL_FUNCTION, econtext.core._DEAD) @@ -209,14 +209,14 @@ class LocalStream(econtext.core.Stream): def connect(self): LOG.debug('%r.connect()', self) pid, sock = create_child(*self.get_boot_command()) - self.read_side = econtext.core.Side(self, os.dup(sock.fileno())) - self.write_side = econtext.core.Side(self, os.dup(sock.fileno())) + self.receive_side = econtext.core.Side(self, os.dup(sock.fileno())) + self.transmit_side = econtext.core.Side(self, os.dup(sock.fileno())) sock.close() LOG.debug('%r.connect(): child process stdin/stdout=%r', - self, self.read_side.fd) + self, self.receive_side.fd) - econtext.core.write_all(self.write_side.fd, self.get_preamble()) - s = os.read(self.read_side.fd, 4096) + econtext.core.write_all(self.transmit_side.fd, self.get_preamble()) + s = os.read(self.receive_side.fd, 4096) if s != 'OK\n': raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s) @@ -235,8 +235,7 @@ class SshStream(LocalStream): class Broker(econtext.core.Broker): - #: Always allow time for slaves to drain. - graceful_count = 1 + shutdown_timeout = 5.0 def create_listener(self, address=None, backlog=30): """Listen on `address` for connections from newly spawned contexts.""" @@ -271,7 +270,7 @@ class Context(econtext.core.Context): self.responder = ModuleResponder(self) self.log_forwarder = LogForwarder(self) - def on_disconnect(self): + def on_disconnect(self, broker): self.stream = None def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs):