diff --git a/docs/changelog.rst b/docs/changelog.rst index 9741ca5c..6b4c2015 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -26,6 +26,7 @@ v0.3.3.dev0 * :gh:issue:`920` Support Ansible :ans:conn:`~podman` connection plugin * :gh:issue:`836` :func:`mitogen.utils.with_router` decorator preserves the docstring in addition to the name. * :gh:issue:`936` :ans:mod:`fetch` no longer emits `[DEPRECATION WARNING]: The '_remote_checksum()' method is deprecated.` +* :gh:pull:`683`: Previously broken :mod:`mitogen.fakessh` functionality is restored v0.3.2 (2022-01-12) diff --git a/mitogen/core.py b/mitogen/core.py index bee722e6..3e24407b 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1699,7 +1699,7 @@ class Stream(object): self.protocol = protocol self.protocol.stream = self - def accept(self, rfp, wfp): + def accept(self, rfp, wfp, cloexec=True): """ Attach a pair of file objects to :attr:`receive_side` and :attr:`transmit_side`, after wrapping them in :class:`Side` instances. @@ -1715,8 +1715,8 @@ class Stream(object): :param file wfp: The file object to transmit to. """ - self.receive_side = Side(self, rfp) - self.transmit_side = Side(self, wfp) + self.receive_side = Side(self, rfp, cloexec=cloexec) + self.transmit_side = Side(self, wfp, cloexec=cloexec) def __repr__(self): return "" % (self.name, id(self) & 0xffff,) @@ -3804,7 +3804,7 @@ class ExternalContext(object): self.config = config def _on_broker_exit(self): - if not self.config['profiling']: + if not self.config['profiling'] and not hasattr(mitogen, "exit_status"): os.kill(os.getpid(), signal.SIGTERM) def _on_shutdown_msg(self, msg): @@ -3859,9 +3859,11 @@ class ExternalContext(object): in_fd = self.config.get('in_fd', 100) in_fp = os.fdopen(os.dup(in_fd), 'rb', 0) + out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0) + # Avoid closing in_fd until after duplicating out_fd in case + # (in_fd == out_fd) are the same bidirectional socket fd os.close(in_fd) - out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0) self.stream = MitogenProtocol.build_stream( self.router, parent_id, diff --git a/mitogen/fakessh.py b/mitogen/fakessh.py index 2d660248..a02e8444 100644 --- a/mitogen/fakessh.py +++ b/mitogen/fakessh.py @@ -65,7 +65,7 @@ Sequence: ` is read by fakessh context, a. sets up :py:class:`IoPump` for stdio, registers - stdin_handle for local context. + control_handle and stdin_handle for local context. b. Enqueues :py:data:`CALL_FUNCTION ` for :py:func:`_start_slave` invoked in target context, @@ -78,18 +78,30 @@ Sequence: 5. :py:func:`_fakessh_main` receives control/stdin handles from from :py:func:`_start_slave`, - a. registers remote's stdin_handle with local :py:class:`IoPump`. - b. sends `("start", local_stdin_handle)` to remote's control_handle + a. registers remote's control_handle and stdin_handle with local + :py:class:`IoPump`. + b. sends `("start", ())` to remote's control_handle to start receiving + stdout from remote subprocess c. registers local :py:class:`IoPump` with - :py:class:`mitogen.core.Broker`. - d. loops waiting for `local stdout closed && remote stdout closed` - - 6. :py:func:`_start_slave` control channel receives `("start", stdin_handle)`, - - a. registers remote's stdin_handle with local :py:class:`IoPump` - b. registers local :py:class:`IoPump` with - :py:class:`mitogen.core.Broker`. - c. loops waiting for `local stdout closed && remote stdout closed` + :py:class:`mitogen.core.Broker` to start sending stdin to remote + subprocess + d. forwards _on_stdin data to stdout with IoPump.write and IoPump.close + e. loops waiting for `("exit", status)` control message from slave + and for pending writes to stdout to complete. + + 6. :py:func:`_start_slave` control channel receives `("start", ())`, + + a. registers local :py:class:`IoPump` with + :py:class:`mitogen.core.Broker` to start receiving and forwarding + subprocess stdout + b. forwards _on_stdin data to subprocess stdin with IoPump.write and + IoPump.close + c. shuts down and sends `("exit", status)` control message to master + after reaching EOF from subprocess stdout + + "stdin" handle and handler naming is a little misleading because they are + used to forard stdin data from the master to the slave, but stdout data from + the slave to the master """ import getopt @@ -117,7 +129,33 @@ _mitogen = None class IoPump(mitogen.core.Protocol): - _output_buf = '' + """ + Raw data protocol that transmits and receives in two directions: + + - Forwarding data from protocol receive api to IoPump 'receive' and + 'disconnect' listeners + - Forwarding data from IoPump.write() and IoPump.close() calls to protocol + transmit api + + Overrides default protocol on_disconnect and on_shutdown methods, only + closing the receive side when an on_disconnect EOF is reached, and only + closing the transmit side when close() is called or on_shutdown termination + is forced. This way when EOF is reached for receiving data, outgoing data is + still transmitted in full without being truncated, and vice versa. + + Back pressure is implemented in the receive direction ('receive' listeners + can block) but no back pressure exists in transmit direction (IoPump.write + and IoPump.close calls never block), so writing data too fast can use an + unbounded amount of memory. + + The lack of back pressure for writes should not normally be problem when + IoPump is used by fakessh, because the data should be coming in from a slow + remote source and being transmitted to a fast local process. But there could + be cases where the local process is too slow (maybe writing to a slow disk) + and memory usage gets out of hand. In this case some kind of blocking or + rate limiting may need to be implemented for IoPump.write. + """ + _output_buf = b'' _closed = False def __init__(self, broker): @@ -125,77 +163,84 @@ class IoPump(mitogen.core.Protocol): def write(self, s): self._output_buf += s - self._broker._start_transmit(self) + self._broker._start_transmit(self.stream) def close(self): self._closed = True # If local process hasn't exitted yet, ensure its write buffer is # drained before lazily triggering disconnect in on_transmit. - if self.transmit_side.fp.fileno() is not None: - self._broker._start_transmit(self) + if not self.stream.transmit_side.closed: + self._broker._start_transmit(self.stream) - def on_shutdown(self, stream, broker): + def on_shutdown(self, broker): self.close() + super().on_shutdown(broker) - def on_transmit(self, stream, broker): - written = self.transmit_side.write(self._output_buf) + def on_transmit(self, broker): + written = self.stream.transmit_side.write(self._output_buf) IOLOG.debug('%r.on_transmit() -> len %r', self, written) - if written is None: - self.on_disconnect(broker) - else: - self._output_buf = self._output_buf[written:] - + self._output_buf = self._output_buf[written:] if not self._output_buf: - broker._stop_transmit(self) + broker._stop_transmit(self.stream) if self._closed: - self.on_disconnect(broker) + self.stream.transmit_side.close() + mitogen.core.fire(self, 'write_done') - def on_receive(self, stream, broker): - s = stream.receive_side.read() + def on_receive(self, broker, s): IOLOG.debug('%r.on_receive() -> len %r', self, len(s)) - if s: - mitogen.core.fire(self, 'receive', s) - else: - self.on_disconnect(broker) + mitogen.core.fire(self, 'receive', s) + + def on_disconnect(self, broker): + broker.stop_receive(self.stream) + self.stream.receive_side.close() + mitogen.core.fire(self, 'disconnect') def __repr__(self): return 'IoPump(%r, %r)' % ( - self.receive_side.fp.fileno(), - self.transmit_side.fp.fileno(), + self.stream.receive_side.fp.fileno(), + self.stream.transmit_side.fp.fileno(), ) class Process(object): """ - Manages the lifetime and pipe connections of the SSH command running in the - slave. + Process manager responsible for forwarding data simultaneously in two + directions: + + - From incoming self.stdin_handle data messages to file descriptor output + via IoPump.write() and IoPump.close() calls + - From input file descriptor IoPump 'receive' events to outgoing self.stdin + data messages + + "stdin" naming is a little misleading because the stdin handle and handler + are used to forward both stdin and stdout data, not just stdin data. """ - def __init__(self, router, stdin, stdout, proc=None): + def __init__(self, router): self.router = router - self.stdin = stdin - self.stdout = stdout - self.proc = proc self.control_handle = router.add_handler(self._on_control) self.stdin_handle = router.add_handler(self._on_stdin) - self.pump = IoPump.build_stream(router.broker) - self.pump.accept(stdin, stdout) - self.stdin = None - self.control = None - self.wake_event = threading.Event() - - mitogen.core.listen(self.pump, 'disconnect', self._on_pump_disconnect) - mitogen.core.listen(self.pump, 'receive', self._on_pump_receive) - if proc: - pmon = mitogen.parent.ProcessMonitor.instance() - pmon.add(proc.pid, self._on_proc_exit) + def start(self, dest, control_handle, stdin_handle, in_fd, out_fd, proc=None): + self.control = mitogen.core.Sender(dest, control_handle) + self.stdin = mitogen.core.Sender(dest, stdin_handle) + self.pump = IoPump.build_stream(self.router.broker) + mitogen.core.listen(self.pump.protocol, 'receive', self._on_pump_receive) + mitogen.core.listen(self.pump.protocol, 'disconnect', self._on_pump_disconnect) + mitogen.core.listen(self.pump.protocol, 'write_done', self._on_pump_write_done) + self.pump.accept(in_fd, out_fd, cloexec=proc is not None) + self.proc = proc + if self.proc is None: + self.exit_status = None + self.wake_event = threading.Event() + self.control.send(('start', ())) # start remote forwarding of process output + self.router.broker.start_receive(self.pump) # start local forwarding of process input def __repr__(self): - return 'Process(%r, %r)' % (self.stdin, self.stdout) + return 'Process(%r)' % (self.pump) def _on_proc_exit(self, status): LOG.debug('%r._on_proc_exit(%r)', self, status) - self.control.put(('exit', status)) + self.control.send(('exit', status)) def _on_stdin(self, msg): if msg.is_dead: @@ -212,6 +257,9 @@ class Process(object): command, arg = msg.unpickle(throw=False) LOG.debug('%r._on_control(%r, %s)', self, command, arg) + if isinstance(command, bytes): + command = command.decode() + func = getattr(self, '_on_%s' % (command,), None) if func: return func(msg, arg) @@ -219,35 +267,60 @@ class Process(object): LOG.warning('%r: unknown command %r', self, command) def _on_start(self, msg, arg): - dest = mitogen.core.Context(self.router, msg.src_id) - self.control = mitogen.core.Sender(dest, arg[0]) - self.stdin = mitogen.core.Sender(dest, arg[1]) + # Triggered in fakessh slave process when fakessh master has sent + # 'start' command and is ready to receive stdout data. Handle by calling + # the broker to start receiving and forwarding stdout. + assert self.proc is not None self.router.broker.start_receive(self.pump) def _on_exit(self, msg, arg): + # Triggered in fakessh master process when fakessh slave has sent 'exit' + # command with subprocess exit code. In this case pump.transit_side is + # forwarding remote subprocess output to stdout. If the transmit side is + # closed, all data has been written successfully and there's nothing + # left to do except wake and exit. But if the transmit side is still + # open, it means writes are still pending, and the fakessh master needs + # to wait for _on_pump_write_done event before exiting. + assert self.proc is None LOG.debug('on_exit: proc = %r', self.proc) - if self.proc: - self.proc.terminate() - else: - self.router.broker.shutdown() + self.exit_status = arg + if self.pump.transmit_side.closed: + self.wake_event.set() def _on_pump_receive(self, s): + # Triggered in fakessh master process when stdin data is received and + # needs to be forwarded, and in fakessh slave process when subprocess + # stdout data is received and needs to be forwarded IOLOG.info('%r._on_pump_receive(len %d)', self, len(s)) - self.stdin.put(s) + self.stdin.send(s) def _on_pump_disconnect(self): + # Triggered in fakessh master process when stdin EOF is received, and in + # fakessh slave process when subprocess stdout EOF is received. In the + # slave case this is a signal to call waitpid and send the 'exit' + # command and status code to the fakessh master LOG.debug('%r._on_pump_disconnect()', self) mitogen.core.fire(self, 'disconnect') self.stdin.close() - self.wake_event.set() - - def start_master(self, stdin, control): - self.stdin = stdin - self.control = control - control.put(('start', (self.control_handle, self.stdin_handle))) - self.router.broker.start_receive(self.pump) + if self.proc is not None: + status = self.proc.wait() + self._on_proc_exit(status) + + def _on_pump_write_done(self): + # Triggered in fakessh master process when a write of subprocess output + # data to stdout finishes, and in the fakessh slave process when a write + # of input data to subprocess stdin finishes. This requires triggering + # the wake event in the master process if waking was previously delayed + # due to a pending write. + LOG.debug('%r._on_write_done()', self) + if self.proc is None and self.exit_status is not None: + # Exit + self.wake_event.set() def wait(self): + # Called in fakessh master process to wait for wake event and subprocess + # exit code + assert self.proc is None while not self.wake_event.isSet(): # Timeout is used so that sleep is interruptible, as blocking # variants of libc thread operations cannot be interrupted e.g. via @@ -257,7 +330,7 @@ class Process(object): @mitogen.core.takes_router -def _start_slave(src_id, cmdline, router): +def _start_slave(src_id, cmdline, control_handle, stdin_handle, router): """ This runs in the target context, it is invoked by _fakessh_main running in the fakessh context immediately after startup. It starts the slave process @@ -276,8 +349,9 @@ def _start_slave(src_id, cmdline, router): stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) - - process = Process(router, proc.stdin, proc.stdout, proc) + process = Process(router) + dest = mitogen.core.Context(router, src_id) + process.start(dest, control_handle, stdin_handle, proc.stdout, proc.stdin, proc=proc) return process.control_handle, process.stdin_handle @@ -347,21 +421,18 @@ def _fakessh_main(dest_context_id, econtext): # Even though SSH receives an argument vector, it still cats the vector # together before sending to the server, the server just uses /bin/sh -c to # run the command. We must remain puke-for-puke compatible. + process = Process(econtext.router) control_handle, stdin_handle = dest.call(_start_slave, - mitogen.context_id, ' '.join(args)) + mitogen.context_id, ' '.join(args), + process.control_handle, process.stdin_handle) LOG.debug('_fakessh_main: received control_handle=%r, stdin_handle=%r', control_handle, stdin_handle) - process = Process(econtext.router, - stdin=os.fdopen(1, 'w+b', 0), - stdout=os.fdopen(0, 'r+b', 0)) - process.start_master( - stdin=mitogen.core.Sender(dest, stdin_handle), - control=mitogen.core.Sender(dest, control_handle), - ) + process.start(dest, control_handle, stdin_handle, os.fdopen(0, 'r+b', 0), os.fdopen(1, 'w+b', 0)) process.wait() - process.control.put(('exit', None)) + mitogen.exit_status = process.exit_status + econtext.router.broker.shutdown() def _get_econtext_config(context, sock2): @@ -379,6 +450,7 @@ def _get_econtext_config(context, sock2): 'profiling': getattr(context.router, 'profiling', False), 'unidirectional': getattr(context.router, 'unidirectional', False), 'setup_stdio': False, + 'send_ec2': False, 'version': mitogen.__version__, } @@ -418,11 +490,14 @@ def run(dest, router, args, deadline=None, econtext=None): fakessh.name = u'fakessh.%d' % (context_id,) sock1, sock2 = socket.socketpair() + sock1.set_inheritable(True) + sock2.set_inheritable(True) - stream = mitogen.core.Stream(router, context_id) + stream = mitogen.core.MitogenProtocol.build_stream(router, context_id, mitogen.context_id) stream.name = u'fakessh' stream.accept(sock1, sock1) router.register(fakessh, stream) + router.route_monitor.notice_stream(stream) # Held in socket buffer until process is booted. fakessh.call_async(_fakessh_main, dest.context_id) @@ -436,8 +511,9 @@ def run(dest, router, args, deadline=None, econtext=None): fp.write(inspect.getsource(mitogen.core)) fp.write('\n') fp.write('ExternalContext(%r).main()\n' % ( - _get_econtext_config(econtext, sock2), + _get_econtext_config(fakessh, sock2), )) + fp.write('sys.exit(mitogen.exit_status)\n') finally: fp.close() @@ -449,7 +525,7 @@ def run(dest, router, args, deadline=None, econtext=None): 'SSH_PATH': ssh_path, }) - proc = subprocess.Popen(args, env=env) + proc = subprocess.Popen(args, env=env, close_fds=False) return proc.wait() finally: shutil.rmtree(tmp_path) diff --git a/tests/fakessh_test.py b/tests/fakessh_test.py index 52321495..b15ae9ef 100644 --- a/tests/fakessh_test.py +++ b/tests/fakessh_test.py @@ -8,7 +8,6 @@ import testlib class RsyncTest(testlib.DockerMixin, testlib.TestCase): - @unittest.skip('broken') def test_rsync_from_master(self): context = self.docker_ssh_any() @@ -22,7 +21,7 @@ class RsyncTest(testlib.DockerMixin, testlib.TestCase): self.assertEqual(return_code, 0) self.assertTrue(context.call(os.path.exists, '/tmp/data')) - self.assertTrue(context.call(os.path.exists, '/tmp/data/simple_pkg/a.py')) + self.assertTrue(context.call(os.path.exists, '/tmp/data/importer/simple_pkg/a.py')) @unittest.skip('broken') def test_rsync_between_direct_children(self):