diff --git a/mitogen/parent.py b/mitogen/parent.py index 1c3e1874..7af73454 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -327,6 +327,7 @@ def popen(**kwargs): is invoked in the child. """ real_preexec_fn = kwargs.pop('preexec_fn', None) + def preexec_fn(): if _preexec_hook: _preexec_hook() @@ -420,7 +421,7 @@ def _acquire_controlling_tty(): def _linux_broken_devpts_openpty(): """ - #462: On broken Linux hosts with mismatched configuration (e.g. old + # 462: On broken Linux hosts with mismatched configuration (e.g. old /etc/fstab template installed), /dev/pts may be mounted without the gid= mount option, causing new slave devices to be created with the group ID of the calling process. This upsets glibc, whose openpty() is required by @@ -451,7 +452,7 @@ def _linux_broken_devpts_openpty(): # TTY. Otherwise when we close the FD we get killed by the kernel, and # the child we spawn that should really attach to it will get EPERM # during _acquire_controlling_tty(). - slave_fd = os.open(pty_name, os.O_RDWR|os.O_NOCTTY) + slave_fd = os.open(pty_name, os.O_RDWR | os.O_NOCTTY) return master_fd, slave_fd except OSError: if master_fd is not None: @@ -639,7 +640,7 @@ class TimerList(object): def get_timeout(self): """ Return the floating point seconds until the next event is due. - + :returns: Floating point delay, or 0.0, or :data:`None` if no events are scheduled. @@ -691,6 +692,7 @@ class PartialZlib(object): A full compression costs ~6ms on a modern machine, this method costs ~35 usec. """ + def __init__(self, s): self.s = s if sys.version_info > (2, 5): @@ -840,6 +842,7 @@ class Argv(object): """ Wrapper to defer argv formatting when debug logging is disabled. """ + def __init__(self, argv): self.argv = argv @@ -866,6 +869,7 @@ class CallSpec(object): """ Wrapper to defer call argument formatting when debug logging is disabled. """ + def __init__(self, func, args, kwargs): self.func = func self.args = args @@ -875,8 +879,8 @@ class CallSpec(object): bits = [self.func.__module__] if inspect.ismethod(self.func): im_self = getattr(self.func, IM_SELF_ATTR) - bits.append(getattr(im_self, '__name__', None) or - getattr(type(im_self), '__name__', None)) + bits.append(getattr(im_self, '__name__', None) + or getattr(type(im_self), '__name__', None)) bits.append(self.func.__name__) return u'.'.join(bits) @@ -914,13 +918,13 @@ class PollPoller(mitogen.core.Poller): # TODO: no proof we dont need writemask too _readmask = ( - getattr(select, 'POLLIN', 0) | - getattr(select, 'POLLHUP', 0) + getattr(select, 'POLLIN', 0) + | getattr(select, 'POLLHUP', 0) ) def _update(self, fd): - mask = (((fd in self._rfds) and self._readmask) | - ((fd in self._wfds) and select.POLLOUT)) + mask = (((fd in self._rfds) and self._readmask) + | ((fd in self._wfds) and select.POLLOUT)) if mask: self._pollobj.register(fd, mask) else: @@ -1043,8 +1047,8 @@ class EpollPoller(mitogen.core.Poller): def _control(self, fd): mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) - mask = (((fd in self._rfds) and select.EPOLLIN) | - ((fd in self._wfds) and select.EPOLLOUT)) + mask = (((fd in self._rfds) and select.EPOLLIN) + | ((fd in self._wfds) and select.EPOLLOUT)) if mask: if fd in self._registered_fds: self._epoll.modify(fd, mask) @@ -1077,8 +1081,8 @@ class EpollPoller(mitogen.core.Poller): self._wfds.pop(fd, None) self._control(fd) - _inmask = (getattr(select, 'EPOLLIN', 0) | - getattr(select, 'EPOLLHUP', 0)) + _inmask = (getattr(select, 'EPOLLIN', 0) + | getattr(select, 'EPOLLHUP', 0)) def _poll(self, timeout): the_timeout = -1 @@ -1249,6 +1253,7 @@ class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol): LogProtocol takes over this FD and creates log messages for anything written to it. """ + def on_line_received(self, line): """ Read a line, decode it as UTF-8, and log it. @@ -1262,6 +1267,7 @@ class MitogenProtocol(mitogen.core.MitogenProtocol): Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child during graceful shutdown. """ + def on_shutdown(self, broker): """ Respond to the broker's request for the stream to shut down by sending @@ -1424,43 +1430,63 @@ class Connection(object): # "[1234 refs]" during exit. @staticmethod def _first_stage(): - R,W=os.pipe() - r,w=os.pipe() + R, W = os.pipe() + r, w = os.pipe() if os.fork(): - os.dup2(0,100) - os.dup2(R,0) - os.dup2(r,101) + os.dup2(0, 100) + os.dup2(R, 0) + os.dup2(r, 101) os.close(R) os.close(r) os.close(W) os.close(w) if sys.platform == 'darwin' and sys.executable == '/usr/bin/python': sys.executable += sys.version[:3] - os.environ['ARGV0']=sys.executable - os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)') - os.write(1,'MITO000\n'.encode()) - C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip') - fp=os.fdopen(W,'wb',0) + os.environ['ARGV0'] = sys.executable + os.execl(sys.executable, sys.executable + '(mitogen:CONTEXT_NAME)') + os.write(1, 'MITO000\n'.encode()) + C = _(os.fdopen(0, 'rb').read(PREAMBLE_COMPRESSED_LEN), 'zip') + fp = os.fdopen(W, 'wb', 0) fp.write(C) fp.close() - fp=os.fdopen(w,'wb',0) + fp = os.fdopen(w, 'wb', 0) fp.write(C) fp.close() - os.write(1,'MITO001\n'.encode()) + os.write(1, 'MITO001\n'.encode()) os.close(2) - def get_python_argv(self): + def get_python_cmd(self, encoded): """ - Return the initial argument vector elements necessary to invoke Python, - by returning a 1-element list containing :attr:`python_path` if it is a - string, or simply returning it if it is already a list. + Return the command necessary to invoke Python, + by returning a 1-element list containing :attr:`python_path` + codecs This allows emulation of existing tools where the Python invocation may - be set to e.g. `['/usr/bin/env', 'python']`. + be set to e.g. `['/usr/bin/env', 'python']` or + `['source', '/opt/rh/rh-python36/enable, '&&', 'python']` """ if isinstance(self.options.python_path, list): - return self.options.python_path - return [self.options.python_path] + python_path = " ".join(self.options.python_path) + else: + pthon_path = self.options.python_path + + # quoting the entire command necessary to invoke python supports + # complex python_paths + return ["'" + python_path + ' -c ' + + '"\'import codecs,os,sys;_=codecs.decode;' + 'exec(_(_("%s".encode(),"base64"),"zip"))\'"' % (encoded.decode(),), + "'"] + + """ + return self.get_python_argv() + [ +- '-c', +- 'import codecs,os,sys;_=codecs.decode;' +- 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),) +- ] + + if isinstance(self.options.python_path, list): +- return self.options.python_path +- return [self.options.python_path] + """ def get_boot_command(self): source = inspect.getsource(self._first_stage) @@ -1477,11 +1503,8 @@ class Connection(object): # codecs.decode() requires a bytes object. Since we must be compatible # with 2.4 (no bytes literal), an extra .encode() either returns the # same str (2.x) or an equivalent bytes (3.x). - return self.get_python_argv() + [ - '-c', - 'import codecs,os,sys;_=codecs.decode;' - 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),) - ] + # return self.get_python_cmd(encoded) + return self.get_python_cmd(encoded) def get_econtext_config(self): assert self.options.max_message_size is not None @@ -1502,7 +1525,7 @@ class Connection(object): def get_preamble(self): suffix = ( - '\nExternalContext(%r).main()\n' %\ + '\nExternalContext(%r).main()\n' % (self.get_econtext_config(),) ) partial = get_core_source_partial() @@ -1609,13 +1632,13 @@ class Connection(object): if self._reaper: return + # kill: # Avoid killing so child has chance to write cProfile data self._reaper = Reaper( broker=self._router.broker, proc=self.proc, kill=not ( - (self.detached and self.child_is_immediate_subprocess) or - # Avoid killing so child has chance to write cProfile data - self._router.profiling + (self.detached and self.child_is_immediate_subprocess) + or self._router.profiling ), # Don't delay shutdown waiting for a detached child, since the # detached child may expect to live indefinitely after its parent @@ -1710,6 +1733,7 @@ class ChildIdAllocator(object): Allocate new context IDs from a block of unique context IDs allocated by the master process. """ + def __init__(self, router): self.router = router self.lock = threading.Lock() @@ -1808,6 +1832,7 @@ class CallChain(object): # chain.reset() automatically invoked. """ + def __init__(self, context, pipelined=False): self.context = context if pipelined: @@ -1995,9 +2020,9 @@ class Context(mitogen.core.Context): def __eq__(self, other): return ( - isinstance(other, mitogen.core.Context) and - (other.context_id == self.context_id) and - (other.router == self.router) + isinstance(other, mitogen.core.Context) + and (other.context_id == self.context_id) + and (other.router == self.router) ) def __hash__(self): @@ -2082,6 +2107,7 @@ class RouteMonitor(object): :data:`None` in the master process, or reference to the parent context we should propagate route updates towards. """ + def __init__(self, router, parent=None): self.router = router self.parent = parent @@ -2166,9 +2192,9 @@ class RouteMonitor(object): """ for stream in self.router.get_streams(): if target_id in stream.protocol.egress_ids and ( - (self.parent is None) or - (self.parent.context_id != stream.protocol.remote_id) - ): + (self.parent is None) + or (self.parent.context_id != stream.protocol.remote_id) + ): self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) def notice_stream(self, stream): @@ -2338,6 +2364,7 @@ class Router(mitogen.core.Router): l = mitogen.core.Latch() mitogen.core.listen(stream, 'disconnect', l.put) + def disconnect(): LOG.debug('Starting disconnect of %r', stream) stream.on_disconnect(self.broker) @@ -2447,7 +2474,7 @@ class Router(mitogen.core.Router): name=name, method_name=method_name, kwargs=mitogen.core.Kwargs(kwargs), - ) + ) if resp['msg'] is not None: raise mitogen.core.StreamError(resp['msg']) @@ -2668,6 +2695,7 @@ class PopenProcess(Process): :param subprocess.Popen proc: The subprocess. """ + def __init__(self, proc, stdin, stdout, stderr=None): super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr) #: The subprocess. @@ -2683,6 +2711,7 @@ class ModuleForwarder(object): forwarding the request to our parent context, or satisfying the request from our local Importer cache. """ + def __init__(self, router, parent_context, importer): self.router = router self.parent_context = parent_context