diff --git a/mitogen/core.py b/mitogen/core.py index fc473ae7..9a5e85db 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2423,9 +2423,10 @@ class Router(object): listen(broker, 'exit', self._on_broker_exit) self._setup_logging() - #: context ID -> Stream + self._write_lock = threading.Lock() + #: context ID -> Stream; must hold _write_lock to edit or iterate self._stream_by_id = {} - #: List of contexts to notify of shutdown. + #: List of contexts to notify of shutdown; must hold _write_lock self._context_by_id = {} self._last_handle = itertools.count(1000) #: handle -> (persistent?, func(msg)) @@ -2456,21 +2457,31 @@ class Router(object): :class:`mitogen.parent.RouteMonitor` in an upgraded context. """ LOG.error('%r._on_del_route() %r', self, msg) - if not msg.is_dead: - target_id_s, _, name = bytes_partition(msg.data, b(':')) - target_id = int(target_id_s, 10) - if target_id not in self._context_by_id: - LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) - return + if msg.is_dead: + return - fire(self._context_by_id[target_id], 'disconnect') + target_id_s, _, name = bytes_partition(msg.data, b(':')) + context = self._context_by_id.get(int(target_id_s, 10)) + if context: + fire(context, 'disconnect') + else: + LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) def _on_stream_disconnect(self, stream): - for context in self._context_by_id.values(): - stream_ = self._stream_by_id.get(context.context_id) - if stream_ is stream: - del self._stream_by_id[context.context_id] - context.on_disconnect() + notify = [] + self._write_lock.acquire() + try: + for context in list(self._context_by_id.values()): + stream_ = self._stream_by_id.get(context.context_id) + if stream_ is stream: + del self._stream_by_id[context.context_id] + notify.append(context) + finally: + self._write_lock.release() + + # Happens outside lock as e.g. RouteMonitor wants the same lock. + for context in notify: + context.on_disconnect() broker_exit_msg = 'Broker has exitted' @@ -2492,14 +2503,27 @@ class Router(object): def context_by_id(self, context_id, via_id=None, create=True, name=None): """ Messy factory/lookup function to find a context by its ID, or construct - it. In future this will be replaced by a much more sensible interface. + it. This will eventually be replaced by a more sensible interface. """ context = self._context_by_id.get(context_id) - if create and not context: - context = self.context_class(self, context_id, name=name) - if via_id is not None: - context.via = self.context_by_id(via_id) - self._context_by_id[context_id] = context + if context: + return context + + if create and via_id is not None: + via = self.context_by_id(via_id) + else: + via = None + + self._write_lock.acquire() + try: + context = self._context_by_id.get(context_id) + if create and not context: + context = self.context_class(self, context_id, name=name) + context.via = via + self._context_by_id[context_id] = context + finally: + self._write_lock.release() + return context def register(self, context, stream): @@ -2509,8 +2533,13 @@ class Router(object): public while the design has not yet settled. """ _v and LOG.debug('register(%r, %r)', context, stream) - self._stream_by_id[context.context_id] = stream - self._context_by_id[context.context_id] = context + self._write_lock.acquire() + try: + self._stream_by_id[context.context_id] = stream + self._context_by_id[context.context_id] = context + finally: + self._write_lock.release() + self.broker.start_receive(stream) listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) @@ -2520,8 +2549,10 @@ class Router(object): `dst_id`. If a specific route for `dst_id` is not known, a reference to the parent context's stream is returned. """ - parent = self._stream_by_id.get(mitogen.parent_id) - return self._stream_by_id.get(dst_id, parent) + return ( + self._stream_by_id.get(dst_id) or + self._stream_by_id.get(mitogen.parent_id) + ) def del_handler(self, handle): """ diff --git a/mitogen/parent.py b/mitogen/parent.py index 76437ebb..fcefd460 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1688,6 +1688,10 @@ class RouteMonitor(object): child is beging upgraded in preparation to become a parent of children of its own. + By virtue of only being active while responding to messages from a handler, + RouteMonitor lives entirely on the broker thread, so its data requires no + locking. + :param Router router: Router to install handlers on. :param Context parent: @@ -1697,6 +1701,9 @@ class RouteMonitor(object): def __init__(self, router, parent=None): self.router = router self.parent = parent + #: Mapping of Stream instance to integer context IDs reachable via the + #: stream; used to cleanup routes during disconnection. + self._routes_by_stream = {} self.router.add_handler( fn=self._on_add_route, handle=mitogen.core.ADD_ROUTE, @@ -1711,9 +1718,6 @@ class RouteMonitor(object): policy=is_immediate_child, overwrite=True, ) - #: Mapping of Stream instance to integer context IDs reachable via the - #: stream; used to cleanup routes during disconnection. - self._routes_by_stream = {} def __repr__(self): return 'RouteMonitor()' @@ -1775,7 +1779,7 @@ class RouteMonitor(object): :param int target_id: ID of the connecting or disconnecting context. """ - for stream in itervalues(self.router._stream_by_id): + for stream in self.router.get_streams(): if target_id in stream.egress_ids: self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) @@ -1918,6 +1922,16 @@ class Router(mitogen.core.Router): stream.detached = True msg.reply(None) + def get_streams(self): + """ + Return a snapshot of all streams in existence at time of call. + """ + self._write_lock.acquire() + try: + return itervalues(self._stream_by_id) + finally: + self._write_lock.release() + def add_route(self, target_id, stream): """ Arrange for messages whose `dst_id` is `target_id` to be forwarded on @@ -1929,11 +1943,12 @@ class Router(mitogen.core.Router): LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) assert isinstance(target_id, int) assert isinstance(stream, Stream) + + self._write_lock.acquire() try: self._stream_by_id[target_id] = stream - except KeyError: - LOG.error('%r: cant add route to %r via %r: no such stream', - self, target_id, stream) + finally: + self._write_lock.release() def del_route(self, target_id): LOG.debug('%r.del_route(%r)', self, target_id) @@ -1942,7 +1957,11 @@ class Router(mitogen.core.Router): # 'disconnect' event on the appropriate Context instance. In that case, # we won't a matching _stream_by_id entry for the disappearing route, # so don't raise an error for a missing key here. - self._stream_by_id.pop(target_id, None) + self._write_lock.acquire() + try: + self._stream_by_id.pop(target_id, None) + finally: + self._write_lock.release() def get_module_blacklist(self): if mitogen.context_id == 0: @@ -2001,7 +2020,11 @@ class Router(mitogen.core.Router): name = u'%s.%s' % (via_context.name, resp['name']) context = self.context_class(self, resp['id'], name=name) context.via = via_context - self._context_by_id[context.context_id] = context + self._write_lock.acquire() + try: + self._context_by_id[context.context_id] = context + finally: + self._write_lock.release() return context def doas(self, **kwargs):