parent: pre-cache bootstrap if possible.

When the interpreter is modern enough, use zlib.compressobj() to
pre-compress the unchanging parts of the bootstrap once, then use
compressobj.copy() to append just the context's config during stream
construction.

Before: 100 loops, best of 3: 5.81 msec per loop
After: 10000 loops, best of 3: 35.9 usec per loop

With 100 targets this is enough to knock 6 seconds off startup, at 500
targets it becomes half a minute.

Test 'program':
        python -m timeit -s '
                import mitogen.parent as p;
                import mitogen.master as m;
                r=m.Router();
                s=p.Stream(r, 0, max_message_size=1);
                r.broker.shutdown()'\
                \
                's.get_preamble()'
pull/564/head
David Wilson 5 years ago
parent e167f6373c
commit 9adc38d8ec

@ -140,7 +140,7 @@ def get_child_modules(path):
return [to_text(name) for _, name, _ in it]
def get_core_source():
def _get_core_source():
"""
Master version of parent.get_core_source().
"""
@ -150,7 +150,7 @@ def get_core_source():
if mitogen.is_master:
# TODO: find a less surprising way of installing this.
mitogen.parent.get_core_source = get_core_source
mitogen.parent._get_core_source = _get_core_source
LOAD_CONST = dis.opname.index('LOAD_CONST')
@ -823,13 +823,14 @@ class ModuleResponder(object):
def _send_load_module(self, stream, fullname):
if fullname not in stream.sent_modules:
LOG.debug('_send_load_module(%r, %r)', stream, fullname)
tup = self._build_tuple(fullname)
msg = mitogen.core.Message.pickled(
tup,
dst_id=stream.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
LOG.debug('%s: sending module %s (%.2f KiB)',
stream.name, fullname, len(msg.data) / 1024.0)
self._router._async_route(msg)
stream.sent_modules.add(fullname)
if tup[2] is not None:

@ -155,8 +155,8 @@ def get_sys_executable():
return '/usr/bin/python'
_core_source_cache = None
_core_source_lock = threading.Lock()
_core_source_partial = None
def _get_core_source():
@ -168,22 +168,24 @@ def _get_core_source():
return inspect.getsource(mitogen.core)
def get_core_source():
def get_core_source_partial():
"""
_get_core_source() is expensive, even with @lru_cache in minify.py, threads
can enter it simultaneously causing severe slowdowns.
"""
global _core_source_cache
if _core_source_cache is not None:
return _core_source_cache
global _core_source_partial
_core_source_lock.acquire()
try:
if _core_source_cache is None:
_core_source_cache = _get_core_source()
return _core_source_cache
finally:
_core_source_lock.release()
if _core_source_partial is None:
_core_source_lock.acquire()
try:
if _core_source_partial is None:
_core_source_partial = PartialZlib(
_get_core_source().encode('utf-8')
)
finally:
_core_source_lock.release()
return _core_source_partial
def get_default_remote_name():
@ -572,6 +574,26 @@ def write_all(fd, s, deadline=None):
poller.close()
class PartialZlib(object):
def __init__(self, s):
self.s = s
if sys.version_info > (2, 5):
self._compressor = zlib.compressobj(9)
self._out = self._compressor.compress(s)
self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH)
else:
self._compressor = None
def append(self, s):
if self._compressor is None:
return zlib.compress(self.s + s, 9)
else:
compressor = self._compressor.copy()
out = self._out
out += compressor.compress(s)
return out + compressor.flush()
class IteratingRead(object):
def __init__(self, fds, deadline=None):
self.deadline = deadline
@ -1300,11 +1322,12 @@ class Stream(mitogen.core.Stream):
}
def get_preamble(self):
source = get_core_source()
source += '\nExternalContext(%r).main()\n' % (
self.get_econtext_config(),
suffix = (
'\nExternalContext(%r).main()\n' %\
(self.get_econtext_config(),)
)
return zlib.compress(source.encode('utf-8'), 9)
partial = get_core_source_partial()
return partial.append(suffix.encode('utf-8'))
def start_child(self):
args = self.get_boot_command()

Loading…
Cancel
Save