diff --git a/mitogen/core.py b/mitogen/core.py index 2a0b9035..31b8e9a2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1015,8 +1015,8 @@ class Broker(object): def __init__(self): self._alive = True self._queue = Queue.Queue() - self._readers = set() - self._writers = set() + self._readers = [] + self._writers = [] self._waker = Waker(self) self.start_receive(self._waker) self._thread = threading.Thread( @@ -1033,23 +1033,33 @@ class Broker(object): self._queue.put((func, args, kwargs)) self._waker.wake() + def _list_discard(self, lst, value): + try: + lst.remove(value) + except ValueError: + pass + + def _list_add(self, lst, value): + if value not in lst: + lst.append(value) + def start_receive(self, stream): IOLOG.debug('%r.start_receive(%r)', self, stream) assert stream.receive_side and stream.receive_side.fd is not None - self.defer(self._readers.add, stream.receive_side) + self.defer(self._list_add, self._readers, stream.receive_side) def stop_receive(self, stream): IOLOG.debug('%r.stop_receive(%r)', self, stream) - self.defer(self._readers.discard, stream.receive_side) + self.defer(self._list_discard, self._readers, stream.receive_side) def start_transmit(self, stream): IOLOG.debug('%r.start_transmit(%r)', self, stream) assert stream.transmit_side and stream.transmit_side.fd is not None - self.defer(self._writers.add, stream.transmit_side) + self.defer(self._list_add, self._writers, stream.transmit_side) def stop_transmit(self, stream): IOLOG.debug('%r.stop_transmit(%r)', self, stream) - self.defer(self._writers.discard, stream.transmit_side) + self.defer(self._list_discard, self._writers, stream.transmit_side) def _call(self, stream, func): try: @@ -1096,7 +1106,7 @@ class Broker(object): self._run_defer() fire(self, 'shutdown') - for side in self._readers | self._writers: + for side in set(self._readers).union(self._writers): self._call(side.stream, side.stream.on_shutdown) deadline = time.time() + self.shutdown_timeout @@ -1109,7 +1119,7 @@ class Broker(object): 'more child processes still connected to ' 'our stdout/stderr pipes.', self) - for side in self._readers | self._writers: + for side in set(self._readers).union(self._writers): LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) except Exception: