issue #456: loosen Waker.defer() shutdown test a little

Allow messages to continue being queued during the shutdown period,
right up until the final loop iteration, even though this is racy, as
too many things depend on .defer() during exit right now.

This doesn't hurt the spirit of the check: it still catches the worst
situation where $user accidentally shut down Broker then tried to
continue using it.
issue510
David Wilson 6 years ago
parent 6592598999
commit 5ef94eb3e2

@ -2122,8 +2122,8 @@ class Waker(BasicStream):
broker_shutdown_msg = ( broker_shutdown_msg = (
"An attempt was made to enqueue a message with a Broker that has " "An attempt was made to enqueue a message with a Broker that has "
"already begun shutting down. It is likely your program called " "already exitted. It is likely your program called Broker.shutdown() "
"Broker.shutdown() too early." "too early."
) )
def defer(self, func, *args, **kwargs): def defer(self, func, *args, **kwargs):
@ -2138,7 +2138,7 @@ class Waker(BasicStream):
if threading.currentThread().ident == self.broker_ident: if threading.currentThread().ident == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self) _vv and IOLOG.debug('%r.defer() [immediate]', self)
return func(*args, **kwargs) return func(*args, **kwargs)
if not self._broker._alive: if self._broker._exitted:
raise Error(self.broker_shutdown_msg) raise Error(self.broker_shutdown_msg)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
@ -2564,6 +2564,7 @@ class Broker(object):
def __init__(self, poller_class=None): def __init__(self, poller_class=None):
self._alive = True self._alive = True
self._exitted = False
self._waker = Waker(self) self._waker = Waker(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread. #: thread, or immediately if the current thread is the broker thread.
@ -2724,6 +2725,7 @@ class Broker(object):
except Exception: except Exception:
LOG.exception('_broker_main() crashed') LOG.exception('_broker_main() crashed')
self._exitted = True
self._broker_exit() self._broker_exit()
fire(self, 'exit') fire(self, 'exit')

Loading…
Cancel
Save