core: add ALLOCATE_ID message for fakessh.

wip-fakessh-exit-status
David Wilson 7 years ago
parent e796487cca
commit 4720eb1c55

@ -271,6 +271,14 @@ Masters listen on the following handles:
module named ``fullname``, and writes the source along with some metadata
back to the handle ``reply_to``. If lookup fails, ``None`` is sent instead.
.. data:: mitogen.core.ALLOCATE_ID
Replies to any message sent to it with a newly allocated unique context ID,
to allow slaves to safely start their own contexts. In future this is
likely to be replaced by 32-bit context IDs and random allocation, with an
improved ``ADD_ROUTE`` message sent upstream rather than downstream that
generates NACKs if any ancestor already knows the ID.
Slaves listen on the following handles:

@ -27,6 +27,7 @@ GET_MODULE = 100
CALL_FUNCTION = 101
FORWARD_LOG = 102
ADD_ROUTE = 103
ALLOCATE_ID = 104
CHUNK_SIZE = 16384
@ -867,6 +868,11 @@ class Router(object):
return handle
def allocate_id(self):
master = Context(self, 0)
reply = master.send_await(Message(dst_id=0, handle=ALLOCATE_ID))
return reply.unpickle()
def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with :py:meth:`add_handle_cb` the connection is dead."""

@ -21,6 +21,7 @@ import socket
import sys
import termios
import textwrap
import threading
import time
import types
import zlib
@ -753,13 +754,43 @@ def _proxy_connect(econtext, name, context_id, method_name, kwargs):
return context.name
class Router(mitogen.core.Router):
context_id_counter = itertools.count(1)
class IdAllocator(object):
def __init__(self, router):
self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
def allocate(self):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1
return id_
finally:
self.lock.release()
def on_allocate_id(self, msg):
id_ = self.allocate()
LOG.debug('%r: allocating ID %d to context %r', id_, msg.src_id)
self.router.route(
mitogen.core.Message.pickled(
id_,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
class Router(mitogen.core.Router):
debug = False
def __init__(self, *args, **kwargs):
super(Router, self).__init__(*args, **kwargs)
self.id_allocator = IdAllocator(self)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
@ -778,6 +809,9 @@ class Router(mitogen.core.Router):
self.broker.shutdown()
self.broker.join()
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id):
return self._context_by_id.get(context_id)
@ -807,11 +841,11 @@ class Router(mitogen.core.Router):
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.context_id_counter.next()
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.context_id_counter.next()
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call_with_deadline(None, True,

@ -0,0 +1,8 @@
import mitogen.core
@mitogen.core.takes_router
def allocate_an_id(router):
return router.allocate_id()

@ -0,0 +1,12 @@
import unittest
import testlib
import id_allocation
class SlaveTest(testlib.RouterMixin, unittest.TestCase):
def test_slave_allocates_id(self):
context = self.router.local()
id_ = context.call(id_allocation.allocate_an_id)
assert id_ == (self.router.id_allocator.next_id - 1)
Loading…
Cancel
Save