trying to support special python interpreters

pull/658/head
Steven Robertson 5 years ago
parent 99c5cece3a
commit defa5ef853

@ -327,6 +327,7 @@ def popen(**kwargs):
is invoked in the child. is invoked in the child.
""" """
real_preexec_fn = kwargs.pop('preexec_fn', None) real_preexec_fn = kwargs.pop('preexec_fn', None)
def preexec_fn(): def preexec_fn():
if _preexec_hook: if _preexec_hook:
_preexec_hook() _preexec_hook()
@ -420,7 +421,7 @@ def _acquire_controlling_tty():
def _linux_broken_devpts_openpty(): def _linux_broken_devpts_openpty():
""" """
#462: On broken Linux hosts with mismatched configuration (e.g. old # 462: On broken Linux hosts with mismatched configuration (e.g. old
/etc/fstab template installed), /dev/pts may be mounted without the gid= /etc/fstab template installed), /dev/pts may be mounted without the gid=
mount option, causing new slave devices to be created with the group ID of mount option, causing new slave devices to be created with the group ID of
the calling process. This upsets glibc, whose openpty() is required by the calling process. This upsets glibc, whose openpty() is required by
@ -451,7 +452,7 @@ def _linux_broken_devpts_openpty():
# TTY. Otherwise when we close the FD we get killed by the kernel, and # TTY. Otherwise when we close the FD we get killed by the kernel, and
# the child we spawn that should really attach to it will get EPERM # the child we spawn that should really attach to it will get EPERM
# during _acquire_controlling_tty(). # during _acquire_controlling_tty().
slave_fd = os.open(pty_name, os.O_RDWR|os.O_NOCTTY) slave_fd = os.open(pty_name, os.O_RDWR | os.O_NOCTTY)
return master_fd, slave_fd return master_fd, slave_fd
except OSError: except OSError:
if master_fd is not None: if master_fd is not None:
@ -691,6 +692,7 @@ class PartialZlib(object):
A full compression costs ~6ms on a modern machine, this method costs ~35 A full compression costs ~6ms on a modern machine, this method costs ~35
usec. usec.
""" """
def __init__(self, s): def __init__(self, s):
self.s = s self.s = s
if sys.version_info > (2, 5): if sys.version_info > (2, 5):
@ -840,6 +842,7 @@ class Argv(object):
""" """
Wrapper to defer argv formatting when debug logging is disabled. Wrapper to defer argv formatting when debug logging is disabled.
""" """
def __init__(self, argv): def __init__(self, argv):
self.argv = argv self.argv = argv
@ -866,6 +869,7 @@ class CallSpec(object):
""" """
Wrapper to defer call argument formatting when debug logging is disabled. Wrapper to defer call argument formatting when debug logging is disabled.
""" """
def __init__(self, func, args, kwargs): def __init__(self, func, args, kwargs):
self.func = func self.func = func
self.args = args self.args = args
@ -875,8 +879,8 @@ class CallSpec(object):
bits = [self.func.__module__] bits = [self.func.__module__]
if inspect.ismethod(self.func): if inspect.ismethod(self.func):
im_self = getattr(self.func, IM_SELF_ATTR) im_self = getattr(self.func, IM_SELF_ATTR)
bits.append(getattr(im_self, '__name__', None) or bits.append(getattr(im_self, '__name__', None)
getattr(type(im_self), '__name__', None)) or getattr(type(im_self), '__name__', None))
bits.append(self.func.__name__) bits.append(self.func.__name__)
return u'.'.join(bits) return u'.'.join(bits)
@ -914,13 +918,13 @@ class PollPoller(mitogen.core.Poller):
# TODO: no proof we dont need writemask too # TODO: no proof we dont need writemask too
_readmask = ( _readmask = (
getattr(select, 'POLLIN', 0) | getattr(select, 'POLLIN', 0)
getattr(select, 'POLLHUP', 0) | getattr(select, 'POLLHUP', 0)
) )
def _update(self, fd): def _update(self, fd):
mask = (((fd in self._rfds) and self._readmask) | mask = (((fd in self._rfds) and self._readmask)
((fd in self._wfds) and select.POLLOUT)) | ((fd in self._wfds) and select.POLLOUT))
if mask: if mask:
self._pollobj.register(fd, mask) self._pollobj.register(fd, mask)
else: else:
@ -1043,8 +1047,8 @@ class EpollPoller(mitogen.core.Poller):
def _control(self, fd): def _control(self, fd):
mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
mask = (((fd in self._rfds) and select.EPOLLIN) | mask = (((fd in self._rfds) and select.EPOLLIN)
((fd in self._wfds) and select.EPOLLOUT)) | ((fd in self._wfds) and select.EPOLLOUT))
if mask: if mask:
if fd in self._registered_fds: if fd in self._registered_fds:
self._epoll.modify(fd, mask) self._epoll.modify(fd, mask)
@ -1077,8 +1081,8 @@ class EpollPoller(mitogen.core.Poller):
self._wfds.pop(fd, None) self._wfds.pop(fd, None)
self._control(fd) self._control(fd)
_inmask = (getattr(select, 'EPOLLIN', 0) | _inmask = (getattr(select, 'EPOLLIN', 0)
getattr(select, 'EPOLLHUP', 0)) | getattr(select, 'EPOLLHUP', 0))
def _poll(self, timeout): def _poll(self, timeout):
the_timeout = -1 the_timeout = -1
@ -1249,6 +1253,7 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
LogProtocol takes over this FD and creates log messages for anything LogProtocol takes over this FD and creates log messages for anything
written to it. written to it.
""" """
def on_line_received(self, line): def on_line_received(self, line):
""" """
Read a line, decode it as UTF-8, and log it. Read a line, decode it as UTF-8, and log it.
@ -1262,6 +1267,7 @@ class MitogenProtocol(mitogen.core.MitogenProtocol):
Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child
during graceful shutdown. during graceful shutdown.
""" """
def on_shutdown(self, broker): def on_shutdown(self, broker):
""" """
Respond to the broker's request for the stream to shut down by sending Respond to the broker's request for the stream to shut down by sending
@ -1424,43 +1430,63 @@ class Connection(object):
# "[1234 refs]" during exit. # "[1234 refs]" during exit.
@staticmethod @staticmethod
def _first_stage(): def _first_stage():
R,W=os.pipe() R, W = os.pipe()
r,w=os.pipe() r, w = os.pipe()
if os.fork(): if os.fork():
os.dup2(0,100) os.dup2(0, 100)
os.dup2(R,0) os.dup2(R, 0)
os.dup2(r,101) os.dup2(r, 101)
os.close(R) os.close(R)
os.close(r) os.close(r)
os.close(W) os.close(W)
os.close(w) os.close(w)
if sys.platform == 'darwin' and sys.executable == '/usr/bin/python': if sys.platform == 'darwin' and sys.executable == '/usr/bin/python':
sys.executable += sys.version[:3] sys.executable += sys.version[:3]
os.environ['ARGV0']=sys.executable os.environ['ARGV0'] = sys.executable
os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)') os.execl(sys.executable, sys.executable + '(mitogen:CONTEXT_NAME)')
os.write(1,'MITO000\n'.encode()) os.write(1, 'MITO000\n'.encode())
C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip') C = _(os.fdopen(0, 'rb').read(PREAMBLE_COMPRESSED_LEN), 'zip')
fp=os.fdopen(W,'wb',0) fp = os.fdopen(W, 'wb', 0)
fp.write(C) fp.write(C)
fp.close() fp.close()
fp=os.fdopen(w,'wb',0) fp = os.fdopen(w, 'wb', 0)
fp.write(C) fp.write(C)
fp.close() fp.close()
os.write(1,'MITO001\n'.encode()) os.write(1, 'MITO001\n'.encode())
os.close(2) os.close(2)
def get_python_argv(self): def get_python_cmd(self, encoded):
""" """
Return the initial argument vector elements necessary to invoke Python, Return the command necessary to invoke Python,
by returning a 1-element list containing :attr:`python_path` if it is a by returning a 1-element list containing :attr:`python_path` + codecs
string, or simply returning it if it is already a list.
This allows emulation of existing tools where the Python invocation may This allows emulation of existing tools where the Python invocation may
be set to e.g. `['/usr/bin/env', 'python']`. be set to e.g. `['/usr/bin/env', 'python']` or
`['source', '/opt/rh/rh-python36/enable, '&&', 'python']`
"""
if isinstance(self.options.python_path, list):
python_path = " ".join(self.options.python_path)
else:
pthon_path = self.options.python_path
# quoting the entire command necessary to invoke python supports
# complex python_paths
return ["'" + python_path + ' -c '
+ '"\'import codecs,os,sys;_=codecs.decode;'
'exec(_(_("%s".encode(),"base64"),"zip"))\'"' % (encoded.decode(),),
"'"]
""" """
return self.get_python_argv() + [
- '-c',
- 'import codecs,os,sys;_=codecs.decode;'
- 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),)
- ]
if isinstance(self.options.python_path, list): if isinstance(self.options.python_path, list):
return self.options.python_path - return self.options.python_path
return [self.options.python_path] - return [self.options.python_path]
"""
def get_boot_command(self): def get_boot_command(self):
source = inspect.getsource(self._first_stage) source = inspect.getsource(self._first_stage)
@ -1477,11 +1503,8 @@ class Connection(object):
# codecs.decode() requires a bytes object. Since we must be compatible # codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the # with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x). # same str (2.x) or an equivalent bytes (3.x).
return self.get_python_argv() + [ # return self.get_python_cmd(encoded)
'-c', return self.get_python_cmd(encoded)
'import codecs,os,sys;_=codecs.decode;'
'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),)
]
def get_econtext_config(self): def get_econtext_config(self):
assert self.options.max_message_size is not None assert self.options.max_message_size is not None
@ -1502,7 +1525,7 @@ class Connection(object):
def get_preamble(self): def get_preamble(self):
suffix = ( suffix = (
'\nExternalContext(%r).main()\n' %\ '\nExternalContext(%r).main()\n' %
(self.get_econtext_config(),) (self.get_econtext_config(),)
) )
partial = get_core_source_partial() partial = get_core_source_partial()
@ -1609,13 +1632,13 @@ class Connection(object):
if self._reaper: if self._reaper:
return return
# kill: # Avoid killing so child has chance to write cProfile data
self._reaper = Reaper( self._reaper = Reaper(
broker=self._router.broker, broker=self._router.broker,
proc=self.proc, proc=self.proc,
kill=not ( kill=not (
(self.detached and self.child_is_immediate_subprocess) or (self.detached and self.child_is_immediate_subprocess)
# Avoid killing so child has chance to write cProfile data or self._router.profiling
self._router.profiling
), ),
# Don't delay shutdown waiting for a detached child, since the # Don't delay shutdown waiting for a detached child, since the
# detached child may expect to live indefinitely after its parent # detached child may expect to live indefinitely after its parent
@ -1710,6 +1733,7 @@ class ChildIdAllocator(object):
Allocate new context IDs from a block of unique context IDs allocated by Allocate new context IDs from a block of unique context IDs allocated by
the master process. the master process.
""" """
def __init__(self, router): def __init__(self, router):
self.router = router self.router = router
self.lock = threading.Lock() self.lock = threading.Lock()
@ -1808,6 +1832,7 @@ class CallChain(object):
# chain.reset() automatically invoked. # chain.reset() automatically invoked.
""" """
def __init__(self, context, pipelined=False): def __init__(self, context, pipelined=False):
self.context = context self.context = context
if pipelined: if pipelined:
@ -1995,9 +2020,9 @@ class Context(mitogen.core.Context):
def __eq__(self, other): def __eq__(self, other):
return ( return (
isinstance(other, mitogen.core.Context) and isinstance(other, mitogen.core.Context)
(other.context_id == self.context_id) and and (other.context_id == self.context_id)
(other.router == self.router) and (other.router == self.router)
) )
def __hash__(self): def __hash__(self):
@ -2082,6 +2107,7 @@ class RouteMonitor(object):
:data:`None` in the master process, or reference to the parent context :data:`None` in the master process, or reference to the parent context
we should propagate route updates towards. we should propagate route updates towards.
""" """
def __init__(self, router, parent=None): def __init__(self, router, parent=None):
self.router = router self.router = router
self.parent = parent self.parent = parent
@ -2166,8 +2192,8 @@ class RouteMonitor(object):
""" """
for stream in self.router.get_streams(): for stream in self.router.get_streams():
if target_id in stream.protocol.egress_ids and ( if target_id in stream.protocol.egress_ids and (
(self.parent is None) or (self.parent is None)
(self.parent.context_id != stream.protocol.remote_id) or (self.parent.context_id != stream.protocol.remote_id)
): ):
self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None)
@ -2338,6 +2364,7 @@ class Router(mitogen.core.Router):
l = mitogen.core.Latch() l = mitogen.core.Latch()
mitogen.core.listen(stream, 'disconnect', l.put) mitogen.core.listen(stream, 'disconnect', l.put)
def disconnect(): def disconnect():
LOG.debug('Starting disconnect of %r', stream) LOG.debug('Starting disconnect of %r', stream)
stream.on_disconnect(self.broker) stream.on_disconnect(self.broker)
@ -2668,6 +2695,7 @@ class PopenProcess(Process):
:param subprocess.Popen proc: :param subprocess.Popen proc:
The subprocess. The subprocess.
""" """
def __init__(self, proc, stdin, stdout, stderr=None): def __init__(self, proc, stdin, stdout, stderr=None):
super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr) super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr)
#: The subprocess. #: The subprocess.
@ -2683,6 +2711,7 @@ class ModuleForwarder(object):
forwarding the request to our parent context, or satisfying the request forwarding the request to our parent context, or satisfying the request
from our local Importer cache. from our local Importer cache.
""" """
def __init__(self, router, parent_context, importer): def __init__(self, router, parent_context, importer):
self.router = router self.router = router
self.parent_context = parent_context self.parent_context = parent_context

Loading…
Cancel
Save