Split up parent and master modules

Knocks 4kb off network footprint for a proxy connection.
pull/79/head
David Wilson 7 years ago
parent a8ad096d06
commit 4d31300dd0

@ -30,7 +30,6 @@ import inspect
import logging import logging
import os import os
import shutil import shutil
import signal
import socket import socket
import subprocess import subprocess
import sys import sys
@ -39,6 +38,7 @@ import threading
import mitogen.core import mitogen.core
import mitogen.master import mitogen.master
import mitogen.parent
from mitogen.core import LOG, IOLOG from mitogen.core import LOG, IOLOG
@ -123,7 +123,7 @@ class Process(object):
mitogen.core.listen(self.pump, 'receive', self._on_pump_receive) mitogen.core.listen(self.pump, 'receive', self._on_pump_receive)
if proc: if proc:
pmon = mitogen.master.ProcessMonitor.instance() pmon = mitogen.parent.ProcessMonitor.instance()
pmon.add(proc.pid, self._on_proc_exit) pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self): def __repr__(self):
@ -313,7 +313,7 @@ def _fakessh_main(dest_context_id, econtext):
@mitogen.core.takes_router @mitogen.core.takes_router
def run(dest, router, args, deadline=None, econtext=None): def run(dest, router, args, deadline=None, econtext=None):
if econtext is not None: if econtext is not None:
mitogen.master.upgrade_router(econtext) mitogen.parent.upgrade_router(econtext)
context_id = router.allocate_id() context_id = router.allocate_id()
fakessh = mitogen.master.Context(router, context_id) fakessh = mitogen.master.Context(router, context_id)

@ -26,24 +26,15 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import dis import dis
import errno
import getpass
import imp import imp
import inspect import inspect
import itertools import itertools
import logging import logging
import os import os
import pkgutil import pkgutil
import pty
import re import re
import select
import signal
import socket
import sys import sys
import termios
import textwrap
import threading import threading
import time
import types import types
import zlib import zlib
@ -58,188 +49,18 @@ if not hasattr(pkgutil, 'find_loader'):
from mitogen.compat import pkgutil from mitogen.compat import pkgutil
import mitogen.core import mitogen.core
import mitogen.parent
from mitogen.core import LOG
LOG = logging.getLogger('mitogen')
IOLOG = logging.getLogger('mitogen.io')
RLOG = logging.getLogger('mitogen.ctx') RLOG = logging.getLogger('mitogen.ctx')
DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M)
IOLOG_RE = re.compile(r'^[ ]*IOLOG.debug\(.+?\)$', re.M)
def minimize_source(source):
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
source = COMMENT_RE.sub('', source)
return source.replace(' ', '\t')
def get_child_modules(path, fullname): def get_child_modules(path, fullname):
it = pkgutil.iter_modules([os.path.dirname(path)]) it = pkgutil.iter_modules([os.path.dirname(path)])
return ['%s.%s' % (fullname, name) for _, name, _ in it] return ['%s.%s' % (fullname, name) for _, name, _ in it]
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd
def flags(names):
"""Return the result of ORing a set of (space separated) :py:mod:`termios`
module constants together."""
return sum(getattr(termios, name) for name in names.split())
def cfmakeraw(tflags):
"""Given a list returned by :py:func:`termios.tcgetattr`, return a list
that has been modified in the same manner as the `cfmakeraw()` C library
function."""
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags
iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON')
oflag &= ~flags('OPOST IXOFF')
lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN')
cflag &= ~flags('CSIZE PARENB')
cflag |= flags('CS8')
# TODO: one or more of the above bit twiddles sets or omits a necessary
# flag. Forcing these fields to zero, as shown below, gets us what we want
# on Linux/OS X, but it is possibly broken on some other OS.
iflag = 0
oflag = 0
lflag = 0
return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
def disable_echo(fd):
old = termios.tcgetattr(fd)
new = cfmakeraw(old)
flags = (
termios.TCSAFLUSH |
getattr(termios, 'TCSASOFT', 0)
)
termios.tcsetattr(fd, flags, new)
def close_nonstandard_fds():
for fd in xrange(3, 1024):
try:
os.close(fd)
except OSError:
pass
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
pid, master_fd, os.getpid(), Argv(args))
return pid, master_fd
def write_all(fd, s, deadline=None):
timeout = None
written = 0
while written < len(s):
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout)
if not wfds:
continue
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n
def iter_read(fd, deadline=None):
bits = []
timeout = None
while True:
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
break
rfds, _, _ = select.select([fd], [], [], timeout)
if not rfds:
continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
IOLOG.debug('iter_read(%r) -> %r', fd, s)
if disconnected or not s:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
)
bits.append(s)
yield s
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline):
if buf.endswith(s):
return
def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'), def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
IMPORT_NAME=dis.opname.index('IMPORT_NAME')): IMPORT_NAME=dis.opname.index('IMPORT_NAME')):
"""Given a code object `co`, scan its bytecode yielding any """Given a code object `co`, scan its bytecode yielding any
@ -713,170 +534,6 @@ class ModuleResponder(object):
) )
class ModuleForwarder(object):
"""
Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache.
"""
def __init__(self, router, parent_context, importer):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)
def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router.route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(msg, rtup)
self._send_one_module(msg, tup)
class Stream(mitogen.core.Stream):
"""
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python2.7'
#: True to cause context to write verbose /tmp/mitogen.<pid>.log.
debug = False
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.sent_packages = set(['mitogen'])
def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if
it does not exist."""
super(Stream, self).construct(**kwargs)
if python_path:
self.python_path = python_path
if remote_name is None:
remote_name = '%s@%s:%d'
remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid())
self.remote_name = remote_name
self.debug = debug
self.profiling = profiling
def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is
# replaced with the context name. Optimized for size.
@staticmethod
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', self.remote_name)
encoded = source.encode('zlib').encode('base64').replace('\n', '')
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x).
return [
self.python_path, '-c',
'from codecs import decode as _;'
'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
]
def get_preamble(self):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main%r\n' % ((
parent_ids, # parent_ids
self.remote_id, # context_id
self.debug,
self.profiling,
LOG.level or logging.getLogger().level or logging.INFO,
),)
compressed = zlib.compress(minimize_source(source))
return str(len(compressed)) + '\n' + compressed
create_child = staticmethod(create_child)
def connect(self):
LOG.debug('%r.connect()', self)
pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (pid,)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
self._connect_bootstrap()
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self):
discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0)
self._ec0_received()
class Broker(mitogen.core.Broker): class Broker(mitogen.core.Broker):
shutdown_timeout = 5.0 shutdown_timeout = 5.0
@ -919,98 +576,10 @@ class Context(mitogen.core.Context):
return self.call_async(fn, *args, **kwargs).get_data() return self.call_async(fn, *args, **kwargs).get_data()
def _local_method(): class Router(mitogen.parent.Router):
return Stream context_class = Context
def _ssh_method():
import mitogen.ssh
return mitogen.ssh.Stream
def _sudo_method():
import mitogen.sudo
return mitogen.sudo.Stream
METHOD_NAMES = {
'local': _local_method,
'ssh': _ssh_method,
'sudo': _sudo_method,
}
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
econtext.router.id_allocator = ChildIdAllocator(econtext.router)
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
@mitogen.core.takes_econtext
def _proxy_connect(name, context_id, method_name, kwargs, econtext):
upgrade_router(econtext)
context = econtext.router._connect(
context_id,
METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.name
class IdAllocator(object):
def __init__(self, router):
self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
def allocate(self):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1
return id_
finally:
self.lock.release()
def on_allocate_id(self, msg):
id_ = self.allocate()
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
self.router.route(
mitogen.core.Message.pickled(
id_,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
LOG.debug('%r: publishing route to %r via %r', self,
allocated, requestee)
self.router.propagate_route(allocated, requestee)
class ChildIdAllocator(object):
def __init__(self, router):
self.router = router
def allocate(self):
master = Context(self.router, 0)
return master.send_await(
mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID)
)
class Router(mitogen.core.Router):
broker_class = Broker broker_class = Broker
debug = False debug = False
profiling = False profiling = False
def __init__(self, broker=None): def __init__(self, broker=None):
@ -1032,18 +601,6 @@ class Router(mitogen.core.Router):
self.broker.shutdown() self.broker.shutdown()
self.broker.join() self.broker.join()
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id, via_id=None):
context = self._context_by_id.get(context_id)
if context is None:
context = Context(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def local(self, **kwargs): def local(self, **kwargs):
return self.connect('local', **kwargs) return self.connect('local', **kwargs)
@ -1053,27 +610,6 @@ class Router(mitogen.core.Router):
def ssh(self, **kwargs): def ssh(self, **kwargs):
return self.connect('ssh', **kwargs) return self.connect('ssh', **kwargs)
def _connect(self, context_id, klass, name=None, **kwargs):
context = Context(self, context_id)
stream = klass(self, context.context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
context.name = stream.name
self.register(context, stream)
return context
def connect(self, method_name, name=None, **kwargs):
klass = METHOD_NAMES[method_name]()
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def propagate_route(self, target, via): def propagate_route(self, target, via):
self.add_route(target.context_id, via.context_id) self.add_route(target.context_id, via.context_id)
child = via child = via
@ -1090,43 +626,40 @@ class Router(mitogen.core.Router):
child = parent child = parent
parent = parent.via parent = parent.via
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
)
name = '%s.%s' % (via_context.name, name)
context = Context(self, context_id, name=name) class IdAllocator(object):
context.via = via_context def __init__(self, router):
self._context_by_id[context.context_id] = context self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
def allocate(self):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1
return id_
finally:
self.lock.release()
self.propagate_route(context, via_context) def on_allocate_id(self, msg):
return context id_ = self.allocate()
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
self.router.route(
mitogen.core.Message.pickled(
id_,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
class ProcessMonitor(object): LOG.debug('%r: publishing route to %r via %r', self,
def __init__(self): allocated, requestee)
# pid -> callback() self.router.propagate_route(allocated, requestee)
self.callback_by_pid = {}
signal.signal(signal.SIGCHLD, self._on_sigchld)
def _on_sigchld(self, _signum, _frame):
for pid, callback in self.callback_by_pid.items():
pid, status = os.waitpid(pid, os.WNOHANG)
if pid:
callback(status)
del self.callback_by_pid[pid]
def add(self, pid, callback):
self.callback_by_pid[pid] = callback
_instance = None
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance

@ -0,0 +1,504 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
import logging
import os
import termios
import signal
import select
import getpass
import time
import socket
import inspect
import textwrap
import zlib
import mitogen.core
from mitogen.core import LOG, IOLOG
DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M)
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def minimize_source(source):
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
source = COMMENT_RE.sub('', source)
return source.replace(' ', '\t')
def flags(names):
"""Return the result of ORing a set of (space separated) :py:mod:`termios`
module constants together."""
return sum(getattr(termios, name) for name in names.split())
def cfmakeraw(tflags):
"""Given a list returned by :py:func:`termios.tcgetattr`, return a list
that has been modified in the same manner as the `cfmakeraw()` C library
function."""
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags
iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON')
oflag &= ~flags('OPOST IXOFF')
lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN')
cflag &= ~flags('CSIZE PARENB')
cflag |= flags('CS8')
# TODO: one or more of the above bit twiddles sets or omits a necessary
# flag. Forcing these fields to zero, as shown below, gets us what we want
# on Linux/OS X, but it is possibly broken on some other OS.
iflag = 0
oflag = 0
lflag = 0
return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
def disable_echo(fd):
old = termios.tcgetattr(fd)
new = cfmakeraw(old)
flags = (
termios.TCSAFLUSH |
getattr(termios, 'TCSASOFT', 0)
)
termios.tcsetattr(fd, flags, new)
def close_nonstandard_fds():
for fd in xrange(3, 1024):
try:
os.close(fd)
except OSError:
pass
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
pid, master_fd, os.getpid(), Argv(args))
return pid, master_fd
def write_all(fd, s, deadline=None):
timeout = None
written = 0
while written < len(s):
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout)
if not wfds:
continue
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n
def iter_read(fd, deadline=None):
bits = []
timeout = None
while True:
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
break
rfds, _, _ = select.select([fd], [], [], timeout)
if not rfds:
continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
IOLOG.debug('iter_read(%r) -> %r', fd, s)
if disconnected or not s:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
)
bits.append(s)
yield s
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline):
if buf.endswith(s):
return
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
econtext.router.id_allocator = ChildIdAllocator(econtext.router)
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
def _local_method():
return mitogen.parent.Stream
def _ssh_method():
import mitogen.ssh
return mitogen.ssh.Stream
def _sudo_method():
import mitogen.sudo
return mitogen.sudo.Stream
METHOD_NAMES = {
'local': _local_method,
'ssh': _ssh_method,
'sudo': _sudo_method,
}
@mitogen.core.takes_econtext
def _proxy_connect(name, context_id, method_name, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router._connect(
context_id,
METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.name
class Stream(mitogen.core.Stream):
"""
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python2.7'
#: True to cause context to write verbose /tmp/mitogen.<pid>.log.
debug = False
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.sent_packages = set(['mitogen'])
def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if
it does not exist."""
super(Stream, self).construct(**kwargs)
if python_path:
self.python_path = python_path
if remote_name is None:
remote_name = '%s@%s:%d'
remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid())
self.remote_name = remote_name
self.debug = debug
self.profiling = profiling
def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is
# replaced with the context name. Optimized for size.
@staticmethod
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', self.remote_name)
encoded = source.encode('zlib').encode('base64').replace('\n', '')
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x).
return [
self.python_path, '-c',
'from codecs import decode as _;'
'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
]
def get_preamble(self):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main%r\n' % ((
parent_ids, # parent_ids
self.remote_id, # context_id
self.debug,
self.profiling,
LOG.level or logging.getLogger().level or logging.INFO,
),)
compressed = zlib.compress(minimize_source(source))
return str(len(compressed)) + '\n' + compressed
create_child = staticmethod(create_child)
def connect(self):
LOG.debug('%r.connect()', self)
pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (pid,)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
self._connect_bootstrap()
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self):
discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0)
self._ec0_received()
class ChildIdAllocator(object):
def __init__(self, router):
self.router = router
def allocate(self):
master = mitogen.core.Context(self.router, 0)
return master.send_await(
mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID)
)
class Router(mitogen.core.Router):
context_class = mitogen.core.Context
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id, via_id=None):
context = self._context_by_id.get(context_id)
if context is None:
context = self.context_class(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def _connect(self, context_id, klass, name=None, **kwargs):
context = self.context_class(self, context_id)
stream = klass(self, context.context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
context.name = stream.name
self.register(context, stream)
return context
def connect(self, method_name, name=None, **kwargs):
klass = METHOD_NAMES[method_name]()
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
)
name = '%s.%s' % (via_context.name, name)
context = self.context_class(self, context_id, name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
self.propagate_route(context, via_context)
return context
class ProcessMonitor(object):
def __init__(self):
# pid -> callback()
self.callback_by_pid = {}
signal.signal(signal.SIGCHLD, self._on_sigchld)
def _on_sigchld(self, _signum, _frame):
for pid, callback in self.callback_by_pid.items():
pid, status = os.waitpid(pid, os.WNOHANG)
if pid:
callback(status)
del self.callback_by_pid[pid]
def add(self, pid, callback):
self.callback_by_pid[pid] = callback
_instance = None
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
class ModuleForwarder(object):
"""
Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache.
"""
def __init__(self, router, parent_context, importer):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)
def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router.route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(msg, rtup)
self._send_one_module(msg, tup)

@ -33,7 +33,7 @@ import commands
import logging import logging
import time import time
import mitogen.master import mitogen.parent
LOG = logging.getLogger('mitogen') LOG = logging.getLogger('mitogen')
@ -46,8 +46,8 @@ class PasswordError(mitogen.core.Error):
pass pass
class Stream(mitogen.master.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.master.tty_create_child) create_child = staticmethod(mitogen.parent.tty_create_child)
python_path = 'python2.7' python_path = 'python2.7'
#: The path to the SSH binary. #: The path to the SSH binary.
@ -103,7 +103,7 @@ class Stream(mitogen.master.Stream):
def _connect_bootstrap(self): def _connect_bootstrap(self):
password_sent = False password_sent = False
for buf in mitogen.master.iter_read(self.receive_side.fd, for buf in mitogen.parent.iter_read(self.receive_side.fd,
time.time() + 10.0): time.time() + 10.0):
LOG.debug('%r: received %r', self, buf) LOG.debug('%r: received %r', self, buf)
if buf.endswith('EC0\n'): if buf.endswith('EC0\n'):

@ -30,7 +30,7 @@ import os
import time import time
import mitogen.core import mitogen.core
import mitogen.master import mitogen.parent
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -41,8 +41,8 @@ class PasswordError(mitogen.core.Error):
pass pass
class Stream(mitogen.master.Stream): class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.master.tty_create_child) create_child = staticmethod(mitogen.parent.tty_create_child)
sudo_path = 'sudo' sudo_path = 'sudo'
password = None password = None
@ -93,7 +93,7 @@ class Stream(mitogen.master.Stream):
def _connect_bootstrap(self): def _connect_bootstrap(self):
password_sent = False password_sent = False
for buf in mitogen.master.iter_read(self.receive_side.fd, for buf in mitogen.parent.iter_read(self.receive_side.fd,
time.time() + 10.0): time.time() + 10.0):
LOG.debug('%r: received %r', self, buf) LOG.debug('%r: received %r', self, buf)
if buf.endswith('EC0\n'): if buf.endswith('EC0\n'):

@ -8,6 +8,7 @@ import zlib
import mitogen.fakessh import mitogen.fakessh
import mitogen.master import mitogen.master
import mitogen.parent
import mitogen.ssh import mitogen.ssh
import mitogen.sudo import mitogen.sudo
@ -26,9 +27,10 @@ print 'Preamble size: %s (%.2fKiB)' % (
for mod in ( for mod in (
mitogen.master, mitogen.master,
mitogen.parent,
mitogen.ssh, mitogen.ssh,
mitogen.sudo, mitogen.sudo,
mitogen.fakessh, mitogen.fakessh,
): ):
sz = len(zlib.compress(mitogen.master.minimize_source(inspect.getsource(mod)))) sz = len(zlib.compress(mitogen.parent.minimize_source(inspect.getsource(mod))))
print '%s size: %s (%.2fKiB)' % (mod.__name__, sz, sz / 1024.0) print '%s size: %s (%.2fKiB)' % (mod.__name__, sz, sz / 1024.0)

Loading…
Cancel
Save