Get mitogen.fakessh module working again

Fixes include

- Setting cloexec flag on pipe files, using set_inheritable on sockets,
  and close_fds=False on subprocess.Popen to work around file
  descriptors not being inheritable by default in new versions of python

- Adding mitogen.exit_status variable and avoiding os.kill call so fake
  'ssh' script is able to exit cleanly with correct status code

- Fixing broken os.dup call in ExternalContext._setup_master when input
  and output streams have the same descriptor

- Updating fakessh module to do necessary python3 string/byte
  conversions, and use updated mitogen Protocol, Stream, and Router apis

- Simplifying fakessh startup sequence so there aren't unnecessary
  differences between ways control and data handles are passed, and ways
  master and slave processes are initialized

- Fixing shutdown race conditions where subprocess exit handling or
  stdin EOF handling could result in a truncated stdout stream

- Updating and adding a lot of docstrings and comments

- Adding Process.proc is None / is not None assertions to be clear about
  which parts of fakessh.Process code are specific to the slave process,
  and which parts are specific to the master process.

- Re-enabling unit test case and updating an outdated file path so it
  passes
pr683
Alex Willmer 3 years ago
parent 89c0cc94d1
commit 70a6d0d7dc

@ -26,6 +26,7 @@ v0.3.3.dev0
* :gh:issue:`920` Support Ansible :ans:conn:`~podman` connection plugin * :gh:issue:`920` Support Ansible :ans:conn:`~podman` connection plugin
* :gh:issue:`836` :func:`mitogen.utils.with_router` decorator preserves the docstring in addition to the name. * :gh:issue:`836` :func:`mitogen.utils.with_router` decorator preserves the docstring in addition to the name.
* :gh:issue:`936` :ans:mod:`fetch` no longer emits `[DEPRECATION WARNING]: The '_remote_checksum()' method is deprecated.` * :gh:issue:`936` :ans:mod:`fetch` no longer emits `[DEPRECATION WARNING]: The '_remote_checksum()' method is deprecated.`
* :gh:pull:`683`: Previously broken :mod:`mitogen.fakessh` functionality is restored
v0.3.2 (2022-01-12) v0.3.2 (2022-01-12)

@ -1699,7 +1699,7 @@ class Stream(object):
self.protocol = protocol self.protocol = protocol
self.protocol.stream = self self.protocol.stream = self
def accept(self, rfp, wfp): def accept(self, rfp, wfp, cloexec=True):
""" """
Attach a pair of file objects to :attr:`receive_side` and Attach a pair of file objects to :attr:`receive_side` and
:attr:`transmit_side`, after wrapping them in :class:`Side` instances. :attr:`transmit_side`, after wrapping them in :class:`Side` instances.
@ -1715,8 +1715,8 @@ class Stream(object):
:param file wfp: :param file wfp:
The file object to transmit to. The file object to transmit to.
""" """
self.receive_side = Side(self, rfp) self.receive_side = Side(self, rfp, cloexec=cloexec)
self.transmit_side = Side(self, wfp) self.transmit_side = Side(self, wfp, cloexec=cloexec)
def __repr__(self): def __repr__(self):
return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,) return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,)
@ -3804,7 +3804,7 @@ class ExternalContext(object):
self.config = config self.config = config
def _on_broker_exit(self): def _on_broker_exit(self):
if not self.config['profiling']: if not self.config['profiling'] and not hasattr(mitogen, "exit_status"):
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
@ -3859,9 +3859,11 @@ class ExternalContext(object):
in_fd = self.config.get('in_fd', 100) in_fd = self.config.get('in_fd', 100)
in_fp = os.fdopen(os.dup(in_fd), 'rb', 0) in_fp = os.fdopen(os.dup(in_fd), 'rb', 0)
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
# Avoid closing in_fd until after duplicating out_fd in case
# (in_fd == out_fd) are the same bidirectional socket fd
os.close(in_fd) os.close(in_fd)
out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
self.stream = MitogenProtocol.build_stream( self.stream = MitogenProtocol.build_stream(
self.router, self.router,
parent_id, parent_id,

@ -65,7 +65,7 @@ Sequence:
<mitogen.core.CALL_FUNCTION>` is read by fakessh context, <mitogen.core.CALL_FUNCTION>` is read by fakessh context,
a. sets up :py:class:`IoPump` for stdio, registers a. sets up :py:class:`IoPump` for stdio, registers
stdin_handle for local context. control_handle and stdin_handle for local context.
b. Enqueues :py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>` for b. Enqueues :py:data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>` for
:py:func:`_start_slave` invoked in target context, :py:func:`_start_slave` invoked in target context,
@ -78,18 +78,30 @@ Sequence:
5. :py:func:`_fakessh_main` receives control/stdin handles from from 5. :py:func:`_fakessh_main` receives control/stdin handles from from
:py:func:`_start_slave`, :py:func:`_start_slave`,
a. registers remote's stdin_handle with local :py:class:`IoPump`. a. registers remote's control_handle and stdin_handle with local
b. sends `("start", local_stdin_handle)` to remote's control_handle :py:class:`IoPump`.
b. sends `("start", ())` to remote's control_handle to start receiving
stdout from remote subprocess
c. registers local :py:class:`IoPump` with c. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker`. :py:class:`mitogen.core.Broker` to start sending stdin to remote
d. loops waiting for `local stdout closed && remote stdout closed` subprocess
d. forwards _on_stdin data to stdout with IoPump.write and IoPump.close
6. :py:func:`_start_slave` control channel receives `("start", stdin_handle)`, e. loops waiting for `("exit", status)` control message from slave
and for pending writes to stdout to complete.
a. registers remote's stdin_handle with local :py:class:`IoPump`
b. registers local :py:class:`IoPump` with 6. :py:func:`_start_slave` control channel receives `("start", ())`,
:py:class:`mitogen.core.Broker`.
c. loops waiting for `local stdout closed && remote stdout closed` a. registers local :py:class:`IoPump` with
:py:class:`mitogen.core.Broker` to start receiving and forwarding
subprocess stdout
b. forwards _on_stdin data to subprocess stdin with IoPump.write and
IoPump.close
c. shuts down and sends `("exit", status)` control message to master
after reaching EOF from subprocess stdout
"stdin" handle and handler naming is a little misleading because they are
used to forard stdin data from the master to the slave, but stdout data from
the slave to the master
""" """
import getopt import getopt
@ -117,7 +129,33 @@ _mitogen = None
class IoPump(mitogen.core.Protocol): class IoPump(mitogen.core.Protocol):
_output_buf = '' """
Raw data protocol that transmits and receives in two directions:
- Forwarding data from protocol receive api to IoPump 'receive' and
'disconnect' listeners
- Forwarding data from IoPump.write() and IoPump.close() calls to protocol
transmit api
Overrides default protocol on_disconnect and on_shutdown methods, only
closing the receive side when an on_disconnect EOF is reached, and only
closing the transmit side when close() is called or on_shutdown termination
is forced. This way when EOF is reached for receiving data, outgoing data is
still transmitted in full without being truncated, and vice versa.
Back pressure is implemented in the receive direction ('receive' listeners
can block) but no back pressure exists in transmit direction (IoPump.write
and IoPump.close calls never block), so writing data too fast can use an
unbounded amount of memory.
The lack of back pressure for writes should not normally be problem when
IoPump is used by fakessh, because the data should be coming in from a slow
remote source and being transmitted to a fast local process. But there could
be cases where the local process is too slow (maybe writing to a slow disk)
and memory usage gets out of hand. In this case some kind of blocking or
rate limiting may need to be implemented for IoPump.write.
"""
_output_buf = b''
_closed = False _closed = False
def __init__(self, broker): def __init__(self, broker):
@ -125,77 +163,84 @@ class IoPump(mitogen.core.Protocol):
def write(self, s): def write(self, s):
self._output_buf += s self._output_buf += s
self._broker._start_transmit(self) self._broker._start_transmit(self.stream)
def close(self): def close(self):
self._closed = True self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is # If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit. # drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fp.fileno() is not None: if not self.stream.transmit_side.closed:
self._broker._start_transmit(self) self._broker._start_transmit(self.stream)
def on_shutdown(self, stream, broker): def on_shutdown(self, broker):
self.close() self.close()
super().on_shutdown(broker)
def on_transmit(self, stream, broker): def on_transmit(self, broker):
written = self.transmit_side.write(self._output_buf) written = self.stream.transmit_side.write(self._output_buf)
IOLOG.debug('%r.on_transmit() -> len %r', self, written) IOLOG.debug('%r.on_transmit() -> len %r', self, written)
if written is None:
self.on_disconnect(broker)
else:
self._output_buf = self._output_buf[written:] self._output_buf = self._output_buf[written:]
if not self._output_buf: if not self._output_buf:
broker._stop_transmit(self) broker._stop_transmit(self.stream)
if self._closed: if self._closed:
self.on_disconnect(broker) self.stream.transmit_side.close()
mitogen.core.fire(self, 'write_done')
def on_receive(self, stream, broker): def on_receive(self, broker, s):
s = stream.receive_side.read()
IOLOG.debug('%r.on_receive() -> len %r', self, len(s)) IOLOG.debug('%r.on_receive() -> len %r', self, len(s))
if s:
mitogen.core.fire(self, 'receive', s) mitogen.core.fire(self, 'receive', s)
else:
self.on_disconnect(broker) def on_disconnect(self, broker):
broker.stop_receive(self.stream)
self.stream.receive_side.close()
mitogen.core.fire(self, 'disconnect')
def __repr__(self): def __repr__(self):
return 'IoPump(%r, %r)' % ( return 'IoPump(%r, %r)' % (
self.receive_side.fp.fileno(), self.stream.receive_side.fp.fileno(),
self.transmit_side.fp.fileno(), self.stream.transmit_side.fp.fileno(),
) )
class Process(object): class Process(object):
""" """
Manages the lifetime and pipe connections of the SSH command running in the Process manager responsible for forwarding data simultaneously in two
slave. directions:
- From incoming self.stdin_handle data messages to file descriptor output
via IoPump.write() and IoPump.close() calls
- From input file descriptor IoPump 'receive' events to outgoing self.stdin
data messages
"stdin" naming is a little misleading because the stdin handle and handler
are used to forward both stdin and stdout data, not just stdin data.
""" """
def __init__(self, router, stdin, stdout, proc=None): def __init__(self, router):
self.router = router self.router = router
self.stdin = stdin
self.stdout = stdout
self.proc = proc
self.control_handle = router.add_handler(self._on_control) self.control_handle = router.add_handler(self._on_control)
self.stdin_handle = router.add_handler(self._on_stdin) self.stdin_handle = router.add_handler(self._on_stdin)
self.pump = IoPump.build_stream(router.broker)
self.pump.accept(stdin, stdout)
self.stdin = None
self.control = None
self.wake_event = threading.Event()
mitogen.core.listen(self.pump, 'disconnect', self._on_pump_disconnect)
mitogen.core.listen(self.pump, 'receive', self._on_pump_receive)
if proc: def start(self, dest, control_handle, stdin_handle, in_fd, out_fd, proc=None):
pmon = mitogen.parent.ProcessMonitor.instance() self.control = mitogen.core.Sender(dest, control_handle)
pmon.add(proc.pid, self._on_proc_exit) self.stdin = mitogen.core.Sender(dest, stdin_handle)
self.pump = IoPump.build_stream(self.router.broker)
mitogen.core.listen(self.pump.protocol, 'receive', self._on_pump_receive)
mitogen.core.listen(self.pump.protocol, 'disconnect', self._on_pump_disconnect)
mitogen.core.listen(self.pump.protocol, 'write_done', self._on_pump_write_done)
self.pump.accept(in_fd, out_fd, cloexec=proc is not None)
self.proc = proc
if self.proc is None:
self.exit_status = None
self.wake_event = threading.Event()
self.control.send(('start', ())) # start remote forwarding of process output
self.router.broker.start_receive(self.pump) # start local forwarding of process input
def __repr__(self): def __repr__(self):
return 'Process(%r, %r)' % (self.stdin, self.stdout) return 'Process(%r)' % (self.pump)
def _on_proc_exit(self, status): def _on_proc_exit(self, status):
LOG.debug('%r._on_proc_exit(%r)', self, status) LOG.debug('%r._on_proc_exit(%r)', self, status)
self.control.put(('exit', status)) self.control.send(('exit', status))
def _on_stdin(self, msg): def _on_stdin(self, msg):
if msg.is_dead: if msg.is_dead:
@ -212,6 +257,9 @@ class Process(object):
command, arg = msg.unpickle(throw=False) command, arg = msg.unpickle(throw=False)
LOG.debug('%r._on_control(%r, %s)', self, command, arg) LOG.debug('%r._on_control(%r, %s)', self, command, arg)
if isinstance(command, bytes):
command = command.decode()
func = getattr(self, '_on_%s' % (command,), None) func = getattr(self, '_on_%s' % (command,), None)
if func: if func:
return func(msg, arg) return func(msg, arg)
@ -219,35 +267,60 @@ class Process(object):
LOG.warning('%r: unknown command %r', self, command) LOG.warning('%r: unknown command %r', self, command)
def _on_start(self, msg, arg): def _on_start(self, msg, arg):
dest = mitogen.core.Context(self.router, msg.src_id) # Triggered in fakessh slave process when fakessh master has sent
self.control = mitogen.core.Sender(dest, arg[0]) # 'start' command and is ready to receive stdout data. Handle by calling
self.stdin = mitogen.core.Sender(dest, arg[1]) # the broker to start receiving and forwarding stdout.
assert self.proc is not None
self.router.broker.start_receive(self.pump) self.router.broker.start_receive(self.pump)
def _on_exit(self, msg, arg): def _on_exit(self, msg, arg):
# Triggered in fakessh master process when fakessh slave has sent 'exit'
# command with subprocess exit code. In this case pump.transit_side is
# forwarding remote subprocess output to stdout. If the transmit side is
# closed, all data has been written successfully and there's nothing
# left to do except wake and exit. But if the transmit side is still
# open, it means writes are still pending, and the fakessh master needs
# to wait for _on_pump_write_done event before exiting.
assert self.proc is None
LOG.debug('on_exit: proc = %r', self.proc) LOG.debug('on_exit: proc = %r', self.proc)
if self.proc: self.exit_status = arg
self.proc.terminate() if self.pump.transmit_side.closed:
else: self.wake_event.set()
self.router.broker.shutdown()
def _on_pump_receive(self, s): def _on_pump_receive(self, s):
# Triggered in fakessh master process when stdin data is received and
# needs to be forwarded, and in fakessh slave process when subprocess
# stdout data is received and needs to be forwarded
IOLOG.info('%r._on_pump_receive(len %d)', self, len(s)) IOLOG.info('%r._on_pump_receive(len %d)', self, len(s))
self.stdin.put(s) self.stdin.send(s)
def _on_pump_disconnect(self): def _on_pump_disconnect(self):
# Triggered in fakessh master process when stdin EOF is received, and in
# fakessh slave process when subprocess stdout EOF is received. In the
# slave case this is a signal to call waitpid and send the 'exit'
# command and status code to the fakessh master
LOG.debug('%r._on_pump_disconnect()', self) LOG.debug('%r._on_pump_disconnect()', self)
mitogen.core.fire(self, 'disconnect') mitogen.core.fire(self, 'disconnect')
self.stdin.close() self.stdin.close()
if self.proc is not None:
status = self.proc.wait()
self._on_proc_exit(status)
def _on_pump_write_done(self):
# Triggered in fakessh master process when a write of subprocess output
# data to stdout finishes, and in the fakessh slave process when a write
# of input data to subprocess stdin finishes. This requires triggering
# the wake event in the master process if waking was previously delayed
# due to a pending write.
LOG.debug('%r._on_write_done()', self)
if self.proc is None and self.exit_status is not None:
# Exit
self.wake_event.set() self.wake_event.set()
def start_master(self, stdin, control):
self.stdin = stdin
self.control = control
control.put(('start', (self.control_handle, self.stdin_handle)))
self.router.broker.start_receive(self.pump)
def wait(self): def wait(self):
# Called in fakessh master process to wait for wake event and subprocess
# exit code
assert self.proc is None
while not self.wake_event.isSet(): while not self.wake_event.isSet():
# Timeout is used so that sleep is interruptible, as blocking # Timeout is used so that sleep is interruptible, as blocking
# variants of libc thread operations cannot be interrupted e.g. via # variants of libc thread operations cannot be interrupted e.g. via
@ -257,7 +330,7 @@ class Process(object):
@mitogen.core.takes_router @mitogen.core.takes_router
def _start_slave(src_id, cmdline, router): def _start_slave(src_id, cmdline, control_handle, stdin_handle, router):
""" """
This runs in the target context, it is invoked by _fakessh_main running in This runs in the target context, it is invoked by _fakessh_main running in
the fakessh context immediately after startup. It starts the slave process the fakessh context immediately after startup. It starts the slave process
@ -276,8 +349,9 @@ def _start_slave(src_id, cmdline, router):
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
) )
process = Process(router)
process = Process(router, proc.stdin, proc.stdout, proc) dest = mitogen.core.Context(router, src_id)
process.start(dest, control_handle, stdin_handle, proc.stdout, proc.stdin, proc=proc)
return process.control_handle, process.stdin_handle return process.control_handle, process.stdin_handle
@ -347,21 +421,18 @@ def _fakessh_main(dest_context_id, econtext):
# Even though SSH receives an argument vector, it still cats the vector # Even though SSH receives an argument vector, it still cats the vector
# together before sending to the server, the server just uses /bin/sh -c to # together before sending to the server, the server just uses /bin/sh -c to
# run the command. We must remain puke-for-puke compatible. # run the command. We must remain puke-for-puke compatible.
process = Process(econtext.router)
control_handle, stdin_handle = dest.call(_start_slave, control_handle, stdin_handle = dest.call(_start_slave,
mitogen.context_id, ' '.join(args)) mitogen.context_id, ' '.join(args),
process.control_handle, process.stdin_handle)
LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r', LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r',
control_handle, stdin_handle) control_handle, stdin_handle)
process = Process(econtext.router, process.start(dest, control_handle, stdin_handle, os.fdopen(0, 'r+b', 0), os.fdopen(1, 'w+b', 0))
stdin=os.fdopen(1, 'w+b', 0),
stdout=os.fdopen(0, 'r+b', 0))
process.start_master(
stdin=mitogen.core.Sender(dest, stdin_handle),
control=mitogen.core.Sender(dest, control_handle),
)
process.wait() process.wait()
process.control.put(('exit', None)) mitogen.exit_status = process.exit_status
econtext.router.broker.shutdown()
def _get_econtext_config(context, sock2): def _get_econtext_config(context, sock2):
@ -379,6 +450,7 @@ def _get_econtext_config(context, sock2):
'profiling': getattr(context.router, 'profiling', False), 'profiling': getattr(context.router, 'profiling', False),
'unidirectional': getattr(context.router, 'unidirectional', False), 'unidirectional': getattr(context.router, 'unidirectional', False),
'setup_stdio': False, 'setup_stdio': False,
'send_ec2': False,
'version': mitogen.__version__, 'version': mitogen.__version__,
} }
@ -418,11 +490,14 @@ def run(dest, router, args, deadline=None, econtext=None):
fakessh.name = u'fakessh.%d' % (context_id,) fakessh.name = u'fakessh.%d' % (context_id,)
sock1, sock2 = socket.socketpair() sock1, sock2 = socket.socketpair()
sock1.set_inheritable(True)
sock2.set_inheritable(True)
stream = mitogen.core.Stream(router, context_id) stream = mitogen.core.MitogenProtocol.build_stream(router, context_id, mitogen.context_id)
stream.name = u'fakessh' stream.name = u'fakessh'
stream.accept(sock1, sock1) stream.accept(sock1, sock1)
router.register(fakessh, stream) router.register(fakessh, stream)
router.route_monitor.notice_stream(stream)
# Held in socket buffer until process is booted. # Held in socket buffer until process is booted.
fakessh.call_async(_fakessh_main, dest.context_id) fakessh.call_async(_fakessh_main, dest.context_id)
@ -436,8 +511,9 @@ def run(dest, router, args, deadline=None, econtext=None):
fp.write(inspect.getsource(mitogen.core)) fp.write(inspect.getsource(mitogen.core))
fp.write('\n') fp.write('\n')
fp.write('ExternalContext(%r).main()\n' % ( fp.write('ExternalContext(%r).main()\n' % (
_get_econtext_config(econtext, sock2), _get_econtext_config(fakessh, sock2),
)) ))
fp.write('sys.exit(mitogen.exit_status)\n')
finally: finally:
fp.close() fp.close()
@ -449,7 +525,7 @@ def run(dest, router, args, deadline=None, econtext=None):
'SSH_PATH': ssh_path, 'SSH_PATH': ssh_path,
}) })
proc = subprocess.Popen(args, env=env) proc = subprocess.Popen(args, env=env, close_fds=False)
return proc.wait() return proc.wait()
finally: finally:
shutil.rmtree(tmp_path) shutil.rmtree(tmp_path)

@ -8,7 +8,6 @@ import testlib
class RsyncTest(testlib.DockerMixin, testlib.TestCase): class RsyncTest(testlib.DockerMixin, testlib.TestCase):
@unittest.skip('broken')
def test_rsync_from_master(self): def test_rsync_from_master(self):
context = self.docker_ssh_any() context = self.docker_ssh_any()
@ -22,7 +21,7 @@ class RsyncTest(testlib.DockerMixin, testlib.TestCase):
self.assertEqual(return_code, 0) self.assertEqual(return_code, 0)
self.assertTrue(context.call(os.path.exists, '/tmp/data')) self.assertTrue(context.call(os.path.exists, '/tmp/data'))
self.assertTrue(context.call(os.path.exists, '/tmp/data/simple_pkg/a.py')) self.assertTrue(context.call(os.path.exists, '/tmp/data/importer/simple_pkg/a.py'))
@unittest.skip('broken') @unittest.skip('broken')
def test_rsync_between_direct_children(self): def test_rsync_between_direct_children(self):

Loading…
Cancel
Save