From 4720eb1c555b1232ebf77d96033625098c84e09a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Sep 2017 15:37:21 +0530 Subject: [PATCH] core: add ALLOCATE_ID message for fakessh. --- docs/howitworks.rst | 8 +++++++ mitogen/core.py | 6 ++++++ mitogen/master.py | 42 +++++++++++++++++++++++++++++++++---- tests/data/id_allocation.py | 8 +++++++ tests/id_allocation_test.py | 12 +++++++++++ 5 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 tests/data/id_allocation.py create mode 100644 tests/id_allocation_test.py diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 71a1c778..8d60149d 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -271,6 +271,14 @@ Masters listen on the following handles: module named ``fullname``, and writes the source along with some metadata back to the handle ``reply_to``. If lookup fails, ``None`` is sent instead. +.. data:: mitogen.core.ALLOCATE_ID + + Replies to any message sent to it with a newly allocated unique context ID, + to allow slaves to safely start their own contexts. In future this is + likely to be replaced by 32-bit context IDs and random allocation, with an + improved ``ADD_ROUTE`` message sent upstream rather than downstream that + generates NACKs if any ancestor already knows the ID. + Slaves listen on the following handles: diff --git a/mitogen/core.py b/mitogen/core.py index 02d23c55..c2c5fcb1 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -27,6 +27,7 @@ GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 ADD_ROUTE = 103 +ALLOCATE_ID = 104 CHUNK_SIZE = 16384 @@ -867,6 +868,11 @@ class Router(object): return handle + def allocate_id(self): + master = Context(self, 0) + reply = master.send_await(Message(dst_id=0, handle=ALLOCATE_ID)) + return reply.unpickle() + def on_shutdown(self, broker): """Called during :py:meth:`Broker.shutdown`, informs callbacks registered with :py:meth:`add_handle_cb` the connection is dead.""" diff --git a/mitogen/master.py b/mitogen/master.py index b7bf9c40..629dcdd0 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -21,6 +21,7 @@ import socket import sys import termios import textwrap +import threading import time import types import zlib @@ -753,13 +754,43 @@ def _proxy_connect(econtext, name, context_id, method_name, kwargs): return context.name -class Router(mitogen.core.Router): - context_id_counter = itertools.count(1) +class IdAllocator(object): + def __init__(self, router): + self.router = router + self.next_id = 1 + self.lock = threading.Lock() + router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID) + + def __repr__(self): + return 'IdAllocator(%r)' % (self.router,) + + def allocate(self): + self.lock.acquire() + try: + id_ = self.next_id + self.next_id += 1 + return id_ + finally: + self.lock.release() + + def on_allocate_id(self, msg): + id_ = self.allocate() + LOG.debug('%r: allocating ID %d to context %r', id_, msg.src_id) + self.router.route( + mitogen.core.Message.pickled( + id_, + dst_id=msg.src_id, + handle=msg.reply_to, + ) + ) + +class Router(mitogen.core.Router): debug = False def __init__(self, *args, **kwargs): super(Router, self).__init__(*args, **kwargs) + self.id_allocator = IdAllocator(self) self.responder = ModuleResponder(self) self.log_forwarder = LogForwarder(self) @@ -778,6 +809,9 @@ class Router(mitogen.core.Router): self.broker.shutdown() self.broker.join() + def allocate_id(self): + return self.id_allocator.allocate() + def context_by_id(self, context_id): return self._context_by_id.get(context_id) @@ -807,11 +841,11 @@ class Router(mitogen.core.Router): via = kwargs.pop('via', None) if via is not None: return self.proxy_connect(via, method_name, name=name, **kwargs) - context_id = self.context_id_counter.next() + context_id = self.allocate_id() return self._connect(context_id, klass, name=name, **kwargs) def proxy_connect(self, via_context, method_name, name=None, **kwargs): - context_id = self.context_id_counter.next() + context_id = self.allocate_id() # Must be added prior to _proxy_connect() to avoid a race. self.add_route(context_id, via_context.context_id) name = via_context.call_with_deadline(None, True, diff --git a/tests/data/id_allocation.py b/tests/data/id_allocation.py new file mode 100644 index 00000000..dcfd5230 --- /dev/null +++ b/tests/data/id_allocation.py @@ -0,0 +1,8 @@ + +import mitogen.core + + +@mitogen.core.takes_router +def allocate_an_id(router): + return router.allocate_id() + diff --git a/tests/id_allocation_test.py b/tests/id_allocation_test.py new file mode 100644 index 00000000..78f66a26 --- /dev/null +++ b/tests/id_allocation_test.py @@ -0,0 +1,12 @@ + +import unittest + +import testlib +import id_allocation + + +class SlaveTest(testlib.RouterMixin, unittest.TestCase): + def test_slave_allocates_id(self): + context = self.router.local() + id_ = context.call(id_allocation.allocate_an_id) + assert id_ == (self.router.id_allocator.next_id - 1)