|
|
|
@ -640,7 +640,7 @@ class Context(object):
|
|
|
|
|
try:
|
|
|
|
|
msg = queue.get(True, deadline)
|
|
|
|
|
except Queue.Empty:
|
|
|
|
|
# self.broker.on_thread(self.stream.on_disconnect, self.broker)
|
|
|
|
|
# self.broker.defer(self.stream.on_disconnect, self.broker)
|
|
|
|
|
raise TimeoutError('deadline exceeded.')
|
|
|
|
|
|
|
|
|
|
if msg == _DEAD:
|
|
|
|
@ -855,7 +855,7 @@ class Router(object):
|
|
|
|
|
is destined for the local context, it is dispatched using the handles
|
|
|
|
|
registered with :py:meth:`add_handler`.
|
|
|
|
|
"""
|
|
|
|
|
self.broker.on_thread(self._route, msg)
|
|
|
|
|
self.broker.defer(self._route, msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Broker(object):
|
|
|
|
@ -882,7 +882,7 @@ class Broker(object):
|
|
|
|
|
name='econtext-broker')
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
def on_thread(self, func, *args, **kwargs):
|
|
|
|
|
def defer(self, func, *args, **kwargs):
|
|
|
|
|
if threading.currentThread() == self._thread:
|
|
|
|
|
func(*args, **kwargs)
|
|
|
|
|
else:
|
|
|
|
@ -895,20 +895,20 @@ class Broker(object):
|
|
|
|
|
file descriptor becomes ready for reading,
|
|
|
|
|
:py:meth:`BasicStream.on_transmit` will be called."""
|
|
|
|
|
IOLOG.debug('%r.start_receive(%r)', self, stream)
|
|
|
|
|
self.on_thread(self._readers.add, stream.receive_side)
|
|
|
|
|
self.defer(self._readers.add, stream.receive_side)
|
|
|
|
|
|
|
|
|
|
def stop_receive(self, stream):
|
|
|
|
|
IOLOG.debug('%r.stop_receive(%r)', self, stream)
|
|
|
|
|
self.on_thread(self._readers.discard, stream.receive_side)
|
|
|
|
|
self.defer(self._readers.discard, stream.receive_side)
|
|
|
|
|
|
|
|
|
|
def start_transmit(self, stream):
|
|
|
|
|
IOLOG.debug('%r.start_transmit(%r)', self, stream)
|
|
|
|
|
assert stream.transmit_side
|
|
|
|
|
self.on_thread(self._writers.add, stream.transmit_side)
|
|
|
|
|
self.defer(self._writers.add, stream.transmit_side)
|
|
|
|
|
|
|
|
|
|
def stop_transmit(self, stream):
|
|
|
|
|
IOLOG.debug('%r.stop_transmit(%r)', self, stream)
|
|
|
|
|
self.on_thread(self._writers.discard, stream.transmit_side)
|
|
|
|
|
self.defer(self._writers.discard, stream.transmit_side)
|
|
|
|
|
|
|
|
|
|
def _call(self, stream, func):
|
|
|
|
|
try:
|
|
|
|
@ -917,19 +917,19 @@ class Broker(object):
|
|
|
|
|
LOG.exception('%r crashed', stream)
|
|
|
|
|
stream.on_disconnect(self)
|
|
|
|
|
|
|
|
|
|
def _run_on_thread(self):
|
|
|
|
|
def _run_defer(self):
|
|
|
|
|
while not self._queue.empty():
|
|
|
|
|
func, args, kwargs = self._queue.get()
|
|
|
|
|
try:
|
|
|
|
|
func(*args, **kwargs)
|
|
|
|
|
except Exception:
|
|
|
|
|
LOG.exception('on_thread() crashed: %r(*%r, **%r)',
|
|
|
|
|
LOG.exception('defer() crashed: %r(*%r, **%r)',
|
|
|
|
|
func, args, kwargs)
|
|
|
|
|
self.shutdown()
|
|
|
|
|
|
|
|
|
|
def _loop_once(self, timeout=None):
|
|
|
|
|
IOLOG.debug('%r._loop_once(%r)', self, timeout)
|
|
|
|
|
self._run_on_thread()
|
|
|
|
|
self._run_defer()
|
|
|
|
|
|
|
|
|
|
#IOLOG.debug('readers = %r', self._readers)
|
|
|
|
|
#IOLOG.debug('writers = %r', self._writers)
|
|
|
|
|