issue #406: clean up DiagLogStream handling and connect() failure.

When Stream.connect() fails, have it just use on_disconnect(). Now there
is a single disconnect cleanup path.

Remove cutpasted DiagLogStream setup/destruction, and move it into the
base class (temporarily), and only manage the lifetime of its underlying
FD via Side.close().  This cures another EBADF failure.
issue260
David Wilson 6 years ago
parent e01c8f2891
commit 802de6a8d5

@ -45,10 +45,6 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False child_is_immediate_subprocess = False
#: Once connected, points to the corresponding DiagLogStream, allowing it
#: to be disconnected at the same time this stream is being torn down.
tty_stream = None
username = 'root' username = 'root'
password = None password = None
doas_path = 'doas' doas_path = 'doas'
@ -75,10 +71,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'doas.' + mitogen.core.to_text(self.username) self.name = u'doas.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.doas_path, '-u', self.username, '--'] bits = [self.doas_path, '-u', self.username, '--']
bits = bits + super(Stream, self).get_boot_command() bits = bits + super(Stream, self).get_boot_command()
@ -88,15 +80,13 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'doas password is incorrect' password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required' password_required_msg = 'doas password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd], fds=[self.receive_side.fd, self.diag_stream.receive_side.fd],
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )
password_sent = False
for buf in it: for buf in it:
LOG.debug('%r: received %r', self, buf) LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER): if buf.endswith(self.EC0_MARKER):
@ -111,7 +101,7 @@ class Stream(mitogen.parent.Stream):
if password_sent: if password_sent:
raise PasswordError(self.password_incorrect_msg) raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password') LOG.debug('sending password')
self.tty_stream.transmit_side.write( self.diag_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8') mitogen.core.to_text(self.password + '\n').encode('utf-8')
) )
password_sent = True password_sent = True

@ -188,6 +188,6 @@ class Stream(mitogen.parent.Stream):
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
# None required. # None required.
pass pass

@ -939,6 +939,33 @@ class Stream(mitogen.core.Stream):
#: ExternalContext.main(). #: ExternalContext.main().
max_message_size = None max_message_size = None
#: If :attr:`create_child` supplied a diag_fd, references the corresponding
#: :class:`DiagLogStream`, allowing it to be disconnected when this stream
#: is disconnected. Set to :data:`None` if no `diag_fd` was present.
diag_stream = None
#: Function with the semantics of :func:`create_child` used to create the
#: child process.
create_child = staticmethod(create_child)
#: Dictionary of extra kwargs passed to :attr:`create_child`.
create_child_args = {}
#: :data:`True` if the remote has indicated that it intends to detach, and
#: should not be killed on disconnect.
detached = False
#: If :data:`True`, indicates the child should not be killed during
#: graceful detachment, as it the actual process implementing the child
#: context. In all other cases, the subprocess is SSH, sudo, or a similar
#: tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True
#: Prefix given to default names generated by :meth:`connect`.
name_prefix = u'local'
_reaped = False
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs) super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core']) self.sent_modules = set(['mitogen', 'mitogen.core'])
@ -976,15 +1003,6 @@ class Stream(mitogen.core.Stream):
) )
) )
#: If :data:`True`, indicates the subprocess managed by us should not be
#: killed during graceful detachment, as it the actual process implementing
#: the child context. In all other cases, the subprocess is SSH, sudo, or a
#: similar tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True
detached = False
_reaped = False
def _reap_child(self): def _reap_child(self):
""" """
Reap the child process during disconnection. Reap the child process during disconnection.
@ -1024,8 +1042,10 @@ class Stream(mitogen.core.Stream):
raise raise
def on_disconnect(self, broker): def on_disconnect(self, broker):
self._reap_child()
super(Stream, self).on_disconnect(broker) super(Stream, self).on_disconnect(broker)
if self.diag_stream is not None:
self.diag_stream.on_disconnect(broker)
self._reap_child()
# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter # file descriptor 0 as 100, creates a pipe, then execs a new interpreter
@ -1129,10 +1149,6 @@ class Stream(mitogen.core.Stream):
) )
return zlib.compress(source.encode('utf-8'), 9) return zlib.compress(source.encode('utf-8'), 9)
create_child = staticmethod(create_child)
create_child_args = {}
name_prefix = u'local'
def start_child(self): def start_child(self):
args = self.get_boot_command() args = self.get_boot_command()
try: try:
@ -1154,26 +1170,28 @@ class Stream(mitogen.core.Stream):
def connect(self): def connect(self):
LOG.debug('%r.connect()', self) LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child() self.pid, fd, diag_fd = self.start_child()
self.name = u'%s.%s' % (self.name_prefix, self.pid) self.name = u'%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd) self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd)) self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r', if diag_fd is not None:
self, self.receive_side.fd) self.diag_stream = DiagLogStream(diag_fd, self)
else:
self.diag_stream = None
LOG.debug('%r.connect(): stdin=%r, stdout=%r, diag=%r',
self, self.receive_side.fd, self.transmit_side.fd,
self.diag_stream and self.diag_stream.receive_side.fd)
try: try:
self._connect_bootstrap(extra_fd) self._connect_bootstrap()
except EofError: except EofError:
self.receive_side.close() self.on_disconnect(self._router.broker)
self.transmit_side.close()
e = sys.exc_info()[1] e = sys.exc_info()[1]
self._adorn_eof_error(e) self._adorn_eof_error(e)
raise raise
except Exception: except Exception:
self.receive_side.close() self.on_disconnect(self._router.broker)
self.transmit_side.close()
if extra_fd is not None:
os.close(extra_fd)
self._reap_child() self._reap_child()
raise raise
@ -1188,8 +1206,10 @@ class Stream(mitogen.core.Stream):
write_all(self.transmit_side.fd, self.get_preamble()) write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, self.EC1_MARKER, discard_until(self.receive_side.fd, self.EC1_MARKER,
self.connect_deadline) self.connect_deadline)
if self.diag_stream:
self._router.broker.start_receive(self.diag_stream)
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
discard_until(self.receive_side.fd, self.EC0_MARKER, discard_until(self.receive_side.fd, self.EC0_MARKER,
self.connect_deadline) self.connect_deadline)
self._ec0_received() self._ec0_received()

@ -127,10 +127,6 @@ class Stream(mitogen.parent.Stream):
#: Number of -v invocations to pass on command line. #: Number of -v invocations to pass on command line.
ssh_debug_level = 0 ssh_debug_level = 0
#: If batch_mode=False, points to the corresponding DiagLogStream, allowing
#: it to be disconnected at the same time this stream is being torn down.
tty_stream = None
#: The path to the SSH binary. #: The path to the SSH binary.
ssh_path = 'ssh' ssh_path = 'ssh'
@ -195,11 +191,6 @@ class Stream(mitogen.parent.Stream):
'stderr_pipe': True, 'stderr_pipe': True,
} }
def on_disconnect(self, broker):
if self.tty_stream is not None:
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.ssh_path]
if self.ssh_debug_level: if self.ssh_debug_level:
@ -265,7 +256,7 @@ class Stream(mitogen.parent.Stream):
def _host_key_prompt(self): def _host_key_prompt(self):
if self.check_host_keys == 'accept': if self.check_host_keys == 'accept':
LOG.debug('%r: accepting host key', self) LOG.debug('%r: accepting host key', self)
self.tty_stream.transmit_side.write(b('yes\n')) self.diag_stream.transmit_side.write(b('yes\n'))
return return
# _host_key_prompt() should never be reached with ignore or enforce # _host_key_prompt() should never be reached with ignore or enforce
@ -273,16 +264,10 @@ class Stream(mitogen.parent.Stream):
# with ours. # with ours.
raise HostKeyError(self.hostkey_config_msg) raise HostKeyError(self.hostkey_config_msg)
def _ec0_received(self): def _connect_bootstrap(self):
if self.tty_stream is not None:
self._router.broker.start_receive(self.tty_stream)
return super(Stream, self)._ec0_received()
def _connect_bootstrap(self, extra_fd):
fds = [self.receive_side.fd] fds = [self.receive_side.fd]
if extra_fd is not None: if self.diag_stream is not None:
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) fds.append(self.diag_stream.receive_side.fd)
fds.append(extra_fd)
it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline) it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)
@ -311,7 +296,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None: if self.password is None:
raise PasswordError(self.password_required_msg) raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self) LOG.debug('%r: sending password', self)
self.tty_stream.transmit_side.write( self.diag_stream.transmit_side.write(
(self.password + '\n').encode() (self.password + '\n').encode()
) )
password_sent = True password_sent = True

@ -80,9 +80,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'su.' + mitogen.core.to_text(self.username) self.name = u'su.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command()) argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)] return [self.su_path, self.username, '-c', str(argv)]
@ -90,7 +87,7 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'su password is incorrect' password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required' password_required_msg = 'su password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd], fds=[self.receive_side.fd],

@ -150,10 +150,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'sudo.' + mitogen.core.to_text(self.username) self.name = u'sudo.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July # Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, supply short-form # 2013, so even though we parse long-format options, supply short-form
@ -177,12 +173,14 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'sudo password is incorrect' password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required' password_required_msg = 'sudo password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd], fds=fds,
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )

@ -2,8 +2,13 @@
import json import json
import os import os
import subprocess
import sys import sys
os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_DOAS'] = '1' os.environ['THIS_IS_STUB_DOAS'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])
# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])

@ -2,8 +2,13 @@
import json import json
import os import os
import subprocess
import sys import sys
os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_SUDO'] = '1' os.environ['THIS_IS_STUB_SUDO'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])
# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])

Loading…
Cancel
Save