|
|
|
|
@ -560,31 +560,25 @@ class Broker(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self._alive = True
|
|
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
self._contexts = {}
|
|
|
|
|
self._readers = set()
|
|
|
|
|
self._writers = set()
|
|
|
|
|
self._waker = Waker(self)
|
|
|
|
|
|
|
|
|
|
self._thread = threading.Thread(target=self._broker_main,
|
|
|
|
|
name='econtext-broker')
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
|
|
|
|
def _update_stream(self, stream):
|
|
|
|
|
IOLOG.debug('_update_stream(%r)', stream)
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
if stream.read_side.fd is not None:
|
|
|
|
|
self._readers.add(stream.read_side)
|
|
|
|
|
else:
|
|
|
|
|
self._readers.discard(stream.read_side)
|
|
|
|
|
|
|
|
|
|
if stream.write_side.fd is not None and stream.has_output():
|
|
|
|
|
self._writers.add(stream.write_side)
|
|
|
|
|
else:
|
|
|
|
|
self._writers.discard(stream.write_side)
|
|
|
|
|
finally:
|
|
|
|
|
self._lock.release()
|
|
|
|
|
if stream.read_side.fd is not None:
|
|
|
|
|
self._readers.add(stream.read_side)
|
|
|
|
|
else:
|
|
|
|
|
self._readers.discard(stream.read_side)
|
|
|
|
|
|
|
|
|
|
if stream.write_side.fd is not None and stream.has_output():
|
|
|
|
|
self._writers.add(stream.write_side)
|
|
|
|
|
else:
|
|
|
|
|
self._writers.discard(stream.write_side)
|
|
|
|
|
|
|
|
|
|
def update_stream(self, stream):
|
|
|
|
|
self._update_stream(stream)
|
|
|
|
|
|