ssh: better OpenSSH 7.5+ permission denied handling

The user@host prefix in new-style OpenSSH messages unfortunately takes
the host part from ~/.ssh/config and friends. There is no way to know
which hostname will appear in this string without parsing the OpenSSH
config, nor which username will appear.

Instead just regex it.

Add SSH stub modes to print the new/old errors and add some simple
tests.

This extends the work done in b9112a9cbb
issue260
David Wilson 6 years ago
parent b527ff0b66
commit 16ca111ebd

@ -31,6 +31,7 @@ Functionality to allow establishing new slave contexts over an SSH connection.
"""
import logging
import re
import time
try:
@ -46,10 +47,16 @@ LOG = logging.getLogger('mitogen')
# sshpass uses 'assword' because it doesn't lowercase the input.
PASSWORD_PROMPT = b('password')
PERMDENIED_PROMPT = b('permission denied')
HOSTKEY_REQ_PROMPT = b('are you sure you want to continue connecting (yes/no)?')
HOSTKEY_FAIL = b('host key verification failed.')
# [user@host: ] permission denied
PERMDENIED_RE = re.compile(
('(?:[^@]+@[^:]+: )?' # Absent in OpenSSH <7.5
'Permission denied').encode(),
re.I
)
DEBUG_PREFIXES = (b('debug1:'), b('debug2:'), b('debug3:'))
@ -289,11 +296,7 @@ class Stream(mitogen.parent.Stream):
self._host_key_prompt()
elif HOSTKEY_FAIL in buf.lower():
raise HostKeyError(self.hostkey_failed_msg)
elif buf.lower().startswith((
PERMDENIED_PROMPT,
b("%s@%s: " % (self.username, self.hostname))
+ PERMDENIED_PROMPT,
)):
elif PERMDENIED_RE.match(buf):
# issue #271: work around conflict with user shell reporting
# 'permission denied' e.g. during chdir($HOME) by only matching
# it at the start of the line.

@ -15,6 +15,10 @@ Are you sure you want to continue connecting (yes/no)?
HOST_KEY_STRICT_MSG = """Host key verification failed.\n"""
PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n'
PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n'
def tty(msg):
fp = open('/dev/tty', 'wb', 0)
@ -37,13 +41,23 @@ def confirm(msg):
fp.close()
if os.getenv('FAKESSH_MODE') == 'ask':
mode = os.getenv('FAKESSH_MODE')
if mode == 'ask':
assert 'y\n' == confirm(HOST_KEY_ASK_MSG)
if os.getenv('FAKESSH_MODE') == 'strict':
elif mode == 'strict':
stderr(HOST_KEY_STRICT_MSG)
sys.exit(255)
elif mode == 'permdenied_classic':
stderr(PERMDENIED_CLASSIC_MSG)
sys.exit(255)
elif mode == 'permdenied_75':
stderr(PERMDENIED_75_MSG)
sys.exit(255)
#
# Set an env var if stderr was a TTY to make ssh_test tests easier to write.

@ -124,9 +124,10 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase):
self.assertEquals(name, context.name)
class RequirePtyTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
class FakeSshMixin(testlib.RouterMixin):
"""
Mix-in that provides :meth:`fake_ssh` executing the stub 'ssh.py'.
"""
def fake_ssh(self, FAKESSH_MODE=None, **kwargs):
os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE)
try:
@ -139,6 +140,20 @@ class RequirePtyTest(testlib.DockerMixin, testlib.TestCase):
finally:
del os.environ['FAKESSH_MODE']
class PermissionDeniedTest(FakeSshMixin, testlib.TestCase):
def test_classic_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_classic'))
def test_openssh_75_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_75'))
class RequirePtyTest(FakeSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream
def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept
context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept')

Loading…
Cancel
Save