pull/683/merge
Ryan Ofsky 3 weeks ago committed by GitHub
commit 626a746baa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -446,6 +446,7 @@ v0.3.1 (unreleased)
* :gh:issue:`876` `python -c ...` first stage no longer contains tab characters, to reduce size
* :gh:issue:`878` Continuous Integration tests now correctly perform comparisons of 2 digit versions
* :gh:issue:`878` Kubectl connector fixed with Ansible 2.10 and above
* :gh:pull:`683` Previously broken :mod:`mitogen.fakessh` functionality is restored
v0.3.0 (2021-11-24)

@ -1877,7 +1877,7 @@ class Stream(object):
self.protocol = protocol
self.protocol.stream = self
def accept(self, rfp, wfp):
def accept(self, rfp, wfp, cloexec=True):
"""
Attach a pair of file objects to :attr:`receive_side` and
:attr:`transmit_side`, after wrapping them in :class:`Side` instances.
@ -1892,8 +1892,8 @@ class Stream(object):
:param file wfp:
The file object to transmit to.
"""
self.receive_side = Side(self, rfp)
self.transmit_side = Side(self, wfp)
self.receive_side = Side(self, rfp, cloexec=cloexec)
self.transmit_side = Side(self, wfp, cloexec=cloexec)
def __repr__(self):
return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,)
@ -3970,7 +3970,7 @@ class ExternalContext(object):
self.config = config
def _on_broker_exit(self):
if not self.config['profiling']:
if not self.config['profiling'] and not hasattr(mitogen, "exit_status"):
os.kill(os.getpid(), signal.SIGTERM)
def _on_shutdown_msg(self, msg):
@ -4025,11 +4025,13 @@ class ExternalContext(object):
in_fd = self.config.get('in_fd', 100)
in_fp = os.fdopen(os.dup(in_fd), 'rb', 0)
os.close(in_fd)
out_fd = self.config.get('out_fd', pty.STDOUT_FILENO)
out_fd2 = os.dup(out_fd)
out_fp = os.fdopen(out_fd2, 'wb', 0)
# Avoid closing in_fd until after duplicating out_fd in case
# (in_fd == out_fd) are the same bidirectional socket fd
os.close(in_fd)
self.stream = MitogenProtocol.build_stream(
self.router,
parent_id,

@ -65,7 +65,7 @@ Sequence:
<mitogen.core.CALL_FUNCTION>` is read by fakessh context,
a. sets up :py:class:`IoPump` for stdio, registers
stdin_handle for local context.
control_handle and stdin_handle for local context.
b. Enqueues :py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>` for
:py:func:`_start_slave` invoked in target context,
@ -78,18 +78,30 @@ Sequence:
5. :py:func:`_fakessh_main` receives control/stdin handles from from
:py:func:`_start_slave`,
a. registers remote's stdin_handle with local :py:class:`IoPump`.
b. sends `("start", local_stdin_handle)` to remote's control_handle
a. registers remote's control_handle and stdin_handle with local
:py:class:`IoPump`.
b. sends `("start", ())` to remote's control_handle to start receiving
stdout from remote subprocess
c. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
d. loops waiting for `local stdout closed && remote stdout closed`
6. :py:func:`_start_slave` control channel receives `("start", stdin_handle)`,
a. registers remote's stdin_handle with local :py:class:`IoPump`
b. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`.
c. loops waiting for `local stdout closed && remote stdout closed`
:py:class:`mitogen.core.Broker` to start sending stdin to remote
subprocess
d. forwards _on_stdin data to stdout with IoPump.write and IoPump.close
e. loops waiting for `("exit", status)` control message from slave
and for pending writes to stdout to complete.
6. :py:func:`_start_slave` control channel receives `("start", ())`,
a. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker` to start receiving and forwarding
subprocess stdout
b. forwards _on_stdin data to subprocess stdin with IoPump.write and
IoPump.close
c. shuts down and sends `("exit", status)` control message to master
after reaching EOF from subprocess stdout
"stdin" handle and handler naming is a little misleading because they are
used to forard stdin data from the master to the slave, but stdout data from
the slave to the master
"""
import getopt
@ -117,7 +129,33 @@ _mitogen = None
class IoPump(mitogen.core.Protocol):
_output_buf = ''
"""
Raw data protocol that transmits and receives in two directions:
- Forwarding data from protocol receive api to IoPump 'receive' and
'disconnect' listeners
- Forwarding data from IoPump.write() and IoPump.close() calls to protocol
transmit api
Overrides default protocol on_disconnect and on_shutdown methods, only
closing the receive side when an on_disconnect EOF is reached, and only
closing the transmit side when close() is called or on_shutdown termination
is forced. This way when EOF is reached for receiving data, outgoing data is
still transmitted in full without being truncated, and vice versa.
Back pressure is implemented in the receive direction ('receive' listeners
can block) but no back pressure exists in transmit direction (IoPump.write
and IoPump.close calls never block), so writing data too fast can use an
unbounded amount of memory.
The lack of back pressure for writes should not normally be problem when
IoPump is used by fakessh, because the data should be coming in from a slow
remote source and being transmitted to a fast local process. But there could
be cases where the local process is too slow (maybe writing to a slow disk)
and memory usage gets out of hand. In this case some kind of blocking or
rate limiting may need to be implemented for IoPump.write.
"""
_output_buf = b''
_closed = False
def __init__(self, broker):
@ -125,80 +163,87 @@ class IoPump(mitogen.core.Protocol):
def write(self, s):
self._output_buf += s
self._broker._start_transmit(self)
self._broker._start_transmit(self.stream)
def close(self):
self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fp.fileno() is not None:
self._broker._start_transmit(self)
if not self.stream.transmit_side.closed:
self._broker._start_transmit(self.stream)
def on_shutdown(self, stream, broker):
def on_shutdown(self, broker):
self.close()
super().on_shutdown(broker)
def on_transmit(self, stream, broker):
written = self.transmit_side.write(self._output_buf)
def on_transmit(self, broker):
written = self.stream.transmit_side.write(self._output_buf)
IOLOG.debug('%r.on_transmit() -> len %r', self, written)
if written is None:
self.on_disconnect(broker)
else:
self._output_buf = self._output_buf[written:]
self._output_buf = self._output_buf[written:]
if not self._output_buf:
broker._stop_transmit(self)
broker._stop_transmit(self.stream)
if self._closed:
self.on_disconnect(broker)
self.stream.transmit_side.close()
mitogen.core.fire(self, 'write_done')
def on_receive(self, stream, broker):
s = stream.receive_side.read()
def on_receive(self, broker, s):
IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if s:
mitogen.core.fire(self, 'receive', s)
else:
self.on_disconnect(broker)
mitogen.core.fire(self, 'receive', s)
def on_disconnect(self, broker):
broker.stop_receive(self.stream)
self.stream.receive_side.close()
mitogen.core.fire(self, 'disconnect')
def __repr__(self):
return 'IoPump(%r, %r)' % (
self.receive_side.fp.fileno(),
self.transmit_side.fp.fileno(),
self.stream.receive_side.fp.fileno(),
self.stream.transmit_side.fp.fileno(),
)
class Process(object):
"""
Manages the lifetime and pipe connections of the SSH command running in the
slave.
Process manager responsible for forwarding data simultaneously in two
directions:
- From incoming self.stdin_handle data messages to file descriptor output
via IoPump.write() and IoPump.close() calls
- From input file descriptor IoPump 'receive' events to outgoing self.stdin
data messages
"stdin" naming is a little misleading because the stdin handle and handler
are used to forward both stdin and stdout data, not just stdin data.
"""
def __init__(self, router, stdin, stdout, proc=None):
def __init__(self, router):
self.router = router
self.stdin = stdin
self.stdout = stdout
self.proc = proc
self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump.build_stream(router.broker)
for fp in stdin, stdout:
def start(self, dest, control_handle, stdin_handle, in_fd, out_fd, proc=None):
self.control = mitogen.core.Sender(dest, control_handle)
self.stdin = mitogen.core.Sender(dest, stdin_handle)
self.pump = IoPump.build_stream(self.router.broker)
mitogen.core.listen(self.pump.protocol, 'receive', self._on_pump_receive)
mitogen.core.listen(self.pump.protocol, 'disconnect', self._on_pump_disconnect)
mitogen.core.listen(self.pump.protocol, 'write_done', self._on_pump_write_done)
for fp in in_fd, out_fd:
fd = fp.fileno()
mitogen.core.set_blocking(fd, False)
self.pump.accept(stdin, stdout)
self.stdin = None
self.control = None
self.wake_event = threading.Event()
mitogen.core.listen(self.pump, 'disconnect', self._on_pump_disconnect)
mitogen.core.listen(self.pump, 'receive', self._on_pump_receive)
if proc:
pmon = mitogen.parent.ProcessMonitor.instance()
pmon.add(proc.pid, self._on_proc_exit)
self.pump.accept(in_fd, out_fd, cloexec=proc is not None)
self.proc = proc
if self.proc is None:
self.exit_status = None
self.wake_event = threading.Event()
self.control.send(('start', ())) # start remote forwarding of process output
self.router.broker.start_receive(self.pump) # start local forwarding of process input
def __repr__(self):
return 'Process(%r, %r)' % (self.stdin, self.stdout)
return 'Process(%r)' % (self.pump)
def _on_proc_exit(self, status):
LOG.debug('%r._on_proc_exit(%r)', self, status)
self.control.put(('exit', status))
self.control.send(('exit', status))
def _on_stdin(self, msg):
if msg.is_dead:
@ -215,6 +260,9 @@ class Process(object):
command, arg = msg.unpickle(throw=False)
LOG.debug('%r._on_control(%r, %s)', self, command, arg)
if isinstance(command, bytes):
command = command.decode()
func = getattr(self, '_on_%s' % (command,), None)
if func:
return func(msg, arg)
@ -222,35 +270,60 @@ class Process(object):
LOG.warning('%r: unknown command %r', self, command)
def _on_start(self, msg, arg):
dest = mitogen.core.Context(self.router, msg.src_id)
self.control = mitogen.core.Sender(dest, arg[0])
self.stdin = mitogen.core.Sender(dest, arg[1])
# Triggered in fakessh slave process when fakessh master has sent
# 'start' command and is ready to receive stdout data. Handle by calling
# the broker to start receiving and forwarding stdout.
assert self.proc is not None
self.router.broker.start_receive(self.pump)
def _on_exit(self, msg, arg):
# Triggered in fakessh master process when fakessh slave has sent 'exit'
# command with subprocess exit code. In this case pump.transit_side is
# forwarding remote subprocess output to stdout. If the transmit side is
# closed, all data has been written successfully and there's nothing
# left to do except wake and exit. But if the transmit side is still
# open, it means writes are still pending, and the fakessh master needs
# to wait for _on_pump_write_done event before exiting.
assert self.proc is None
LOG.debug('on_exit: proc = %r', self.proc)
if self.proc:
self.proc.terminate()
else:
self.router.broker.shutdown()
self.exit_status = arg
if self.pump.transmit_side.closed:
self.wake_event.set()
def _on_pump_receive(self, s):
# Triggered in fakessh master process when stdin data is received and
# needs to be forwarded, and in fakessh slave process when subprocess
# stdout data is received and needs to be forwarded
IOLOG.info('%r._on_pump_receive(len %d)', self, len(s))
self.stdin.put(s)
self.stdin.send(s)
def _on_pump_disconnect(self):
# Triggered in fakessh master process when stdin EOF is received, and in
# fakessh slave process when subprocess stdout EOF is received. In the
# slave case this is a signal to call waitpid and send the 'exit'
# command and status code to the fakessh master
LOG.debug('%r._on_pump_disconnect()', self)
mitogen.core.fire(self, 'disconnect')
self.stdin.close()
self.wake_event.set()
def start_master(self, stdin, control):
self.stdin = stdin
self.control = control
control.put(('start', (self.control_handle, self.stdin_handle)))
self.router.broker.start_receive(self.pump)
if self.proc is not None:
status = self.proc.wait()
self._on_proc_exit(status)
def _on_pump_write_done(self):
# Triggered in fakessh master process when a write of subprocess output
# data to stdout finishes, and in the fakessh slave process when a write
# of input data to subprocess stdin finishes. This requires triggering
# the wake event in the master process if waking was previously delayed
# due to a pending write.
LOG.debug('%r._on_write_done()', self)
if self.proc is None and self.exit_status is not None:
# Exit
self.wake_event.set()
def wait(self):
# Called in fakessh master process to wait for wake event and subprocess
# exit code
assert self.proc is None
while not self.wake_event.isSet():
# Timeout is used so that sleep is interruptible, as blocking
# variants of libc thread operations cannot be interrupted e.g. via
@ -260,7 +333,7 @@ class Process(object):
@mitogen.core.takes_router
def _start_slave(src_id, cmdline, router):
def _start_slave(src_id, cmdline, control_handle, stdin_handle, router):
"""
This runs in the target context, it is invoked by _fakessh_main running in
the fakessh context immediately after startup. It starts the slave process
@ -279,8 +352,9 @@ def _start_slave(src_id, cmdline, router):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
process = Process(router, proc.stdin, proc.stdout, proc)
process = Process(router)
dest = mitogen.core.Context(router, src_id)
process.start(dest, control_handle, stdin_handle, proc.stdout, proc.stdin, proc=proc)
return process.control_handle, process.stdin_handle
@ -350,22 +424,18 @@ def _fakessh_main(dest_context_id, econtext):
# Even though SSH receives an argument vector, it still cats the vector
# together before sending to the server, the server just uses /bin/sh -c to
# run the command. We must remain puke-for-puke compatible.
process = Process(econtext.router)
control_handle, stdin_handle = dest.call(_start_slave,
mitogen.context_id, ' '.join(args))
mitogen.context_id, ' '.join(args),
process.control_handle, process.stdin_handle)
LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r',
control_handle, stdin_handle)
process = Process(econtext.router,
stdin=os.fdopen(pty.STDOUT_FILENO, 'w+b', 0),
stdout=os.fdopen(pty.STDIN_FILENO, 'r+b', 0),
)
process.start_master(
stdin=mitogen.core.Sender(dest, stdin_handle),
control=mitogen.core.Sender(dest, control_handle),
)
process.start(dest, control_handle, stdin_handle, os.fdopen(pty.STDIN_FILENO, 'r+b', 0), os.fdopen(pty.STDOUT_FILENO, 'w+b', 0))
process.wait()
process.control.put(('exit', None))
mitogen.exit_status = process.exit_status
econtext.router.broker.shutdown()
def _get_econtext_config(context, sock2):
@ -383,6 +453,7 @@ def _get_econtext_config(context, sock2):
'profiling': getattr(context.router, 'profiling', False),
'unidirectional': getattr(context.router, 'unidirectional', False),
'setup_stdio': False,
'send_ec2': False,
'version': mitogen.__version__,
}
@ -422,12 +493,20 @@ def run(dest, router, args, deadline=None, econtext=None):
fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = mitogen.core.socketpair()
stream = mitogen.core.Stream(router, context_id)
try:
# Python 3.x only
sock1.set_inheritable(True)
sock2.set_inheritable(True)
except AttributeError:
# Python 2.x socket objects are always inheritable
pass
stream = mitogen.core.MitogenProtocol.build_stream(router, context_id, mitogen.context_id)
stream.name = u'fakessh'
mitogen.core.set_blocking(sock1.fileno(), False)
stream.accept(sock1, sock1)
router.register(fakessh, stream)
router.route_monitor.notice_stream(stream)
# Held in socket buffer until process is booted.
fakessh.call_async(_fakessh_main, dest.context_id)
@ -441,8 +520,9 @@ def run(dest, router, args, deadline=None, econtext=None):
fp.write(inspect.getsource(mitogen.core))
fp.write('\n')
fp.write('ExternalContext(%r).main()\n' % (
_get_econtext_config(econtext, sock2),
_get_econtext_config(fakessh, sock2),
))
fp.write('sys.exit(mitogen.exit_status)\n')
finally:
fp.close()
@ -454,7 +534,7 @@ def run(dest, router, args, deadline=None, econtext=None):
'SSH_PATH': ssh_path,
})
proc = subprocess.Popen(args, env=env)
proc = subprocess.Popen(args, env=env, close_fds=False)
return proc.wait()
finally:
shutil.rmtree(tmp_path)

@ -22,7 +22,7 @@ class RsyncTest(testlib.DockerMixin, testlib.TestCase):
self.assertEqual(return_code, 0)
self.assertTrue(context.call(os.path.exists, '/tmp/data'))
self.assertTrue(context.call(os.path.exists, '/tmp/data/simple_pkg/a.py'))
self.assertTrue(context.call(os.path.exists, '/tmp/data/importer/simple_pkg/a.py'))
def test_rsync_between_direct_children(self):
# master -> SSH -> mitogen__has_sudo_pubkey -> rsync(.ssh) -> master ->

Loading…
Cancel
Save