ssh: fix check_host_keys="accept" and test; closes #411

Add real accept/enforce tests.
issue260
David Wilson 6 years ago
parent cf50b572f6
commit 1eae594e32

@ -51,6 +51,11 @@ Core Library
signal the connection has broken, even when one participant is not a parent signal the connection has broken, even when one participant is not a parent
of the other. of the other.
* `#411 <https://github.com/dw/mitogen/issues/411>`_: the SSH method typed
"``y``" rather than the requisite "``yes``" when `check_host_keys="accept"`
was configured. This would lead to connection timeouts due to the hung
response.
* `16ca111e <https://github.com/dw/mitogen/commit/16ca111e>`_: handle OpenSSH * `16ca111e <https://github.com/dw/mitogen/commit/16ca111e>`_: handle OpenSSH
7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present. 7.5 permission denied prompts when ``~/.ssh/config`` rewrites are present.

@ -265,7 +265,7 @@ class Stream(mitogen.parent.Stream):
def _host_key_prompt(self): def _host_key_prompt(self):
if self.check_host_keys == 'accept': if self.check_host_keys == 'accept':
LOG.debug('%r: accepting host key', self) LOG.debug('%r: accepting host key', self)
self.tty_stream.transmit_side.write(b('y\n')) self.tty_stream.transmit_side.write(b('yes\n'))
return return
# _host_key_prompt() should never be reached with ignore or enforce # _host_key_prompt() should never be reached with ignore or enforce

@ -19,7 +19,6 @@ PERMDENIED_CLASSIC_MSG = 'Permission denied (publickey,password)\n'
PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n' PERMDENIED_75_MSG = 'chicken@nandos.com: permission denied (publickey,password)\n'
def tty(msg): def tty(msg):
fp = open('/dev/tty', 'wb', 0) fp = open('/dev/tty', 'wb', 0)
fp.write(msg.encode()) fp.write(msg.encode())
@ -41,10 +40,10 @@ def confirm(msg):
fp.close() fp.close()
mode = os.getenv('FAKESSH_MODE') mode = os.getenv('STUBSSH_MODE')
if mode == 'ask': if mode == 'ask':
assert 'y\n' == confirm(HOST_KEY_ASK_MSG) assert 'yes\n' == confirm(HOST_KEY_ASK_MSG)
elif mode == 'strict': elif mode == 'strict':
stderr(HOST_KEY_STRICT_MSG) stderr(HOST_KEY_STRICT_MSG)

@ -1,5 +1,6 @@
import os import os
import sys import sys
import tempfile
import mitogen import mitogen
import mitogen.ssh import mitogen.ssh
@ -11,6 +12,23 @@ import testlib
import plain_old_module import plain_old_module
class StubSshMixin(testlib.RouterMixin):
"""
Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'.
"""
def stub_ssh(self, STUBSSH_MODE=None, **kwargs):
os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE)
try:
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('stubs/ssh.py'),
**kwargs
)
finally:
del os.environ['STUBSSH_MODE']
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):
context = self.router.ssh( context = self.router.ssh(
@ -23,7 +41,7 @@ class ConstructorTest(testlib.RouterMixin, unittest2.TestCase):
self.assertEquals(3, context.call(plain_old_module.add, 1, 2)) self.assertEquals(3, context.call(plain_old_module.add, 1, 2))
class SshTest(testlib.DockerMixin, unittest2.TestCase): class SshTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_stream_name(self): def test_stream_name(self):
@ -105,6 +123,47 @@ class SshTest(testlib.DockerMixin, unittest2.TestCase):
context.call(plain_old_module.get_sentinel_value), context.call(plain_old_module.get_sentinel_value),
) )
def test_enforce_unknown_host_key(self):
fp = tempfile.NamedTemporaryFile()
try:
e = self.assertRaises(mitogen.ssh.HostKeyError,
lambda: self.docker_ssh(
username='mitogen__has_sudo_pubkey',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='enforce',
)
)
self.assertEquals(e.args[0], mitogen.ssh.Stream.hostkey_failed_msg)
finally:
fp.close()
def test_accept_enforce_host_keys(self):
fp = tempfile.NamedTemporaryFile()
try:
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='accept',
)
context.shutdown(wait=True)
fp.seek(0)
# Lame test, but we're about to use enforce mode anyway, which
# verifies the file contents.
self.assertTrue(len(fp.read()) > 0)
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=['-o', 'UserKnownHostsFile ' + fp.name],
check_host_keys='enforce',
)
context.shutdown(wait=True)
finally:
fp.close()
class BannerTest(testlib.DockerMixin, unittest2.TestCase): class BannerTest(testlib.DockerMixin, unittest2.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's # Verify the ability to disambiguate random spam appearing in the SSHd's
@ -124,54 +183,37 @@ class BannerTest(testlib.DockerMixin, unittest2.TestCase):
self.assertEquals(name, context.name) self.assertEquals(name, context.name)
class FakeSshMixin(testlib.RouterMixin): class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
"""
Mix-in that provides :meth:`fake_ssh` executing the stub 'ssh.py'.
"""
def fake_ssh(self, FAKESSH_MODE=None, **kwargs):
os.environ['FAKESSH_MODE'] = str(FAKESSH_MODE)
try:
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('stubs/ssh.py'),
**kwargs
)
finally:
del os.environ['FAKESSH_MODE']
class PermissionDeniedTest(FakeSshMixin, testlib.TestCase):
def test_classic_prompt(self): def test_classic_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError, self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_classic')) lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic'))
def test_openssh_75_prompt(self): def test_openssh_75_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError, self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.fake_ssh(FAKESSH_MODE='permdenied_75')) lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75'))
class RequirePtyTest(FakeSshMixin, testlib.TestCase): class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_check_host_keys_accept(self): def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept # required=true, host_key_checking=accept
context = self.fake_ssh(FAKESSH_MODE='ask', check_host_keys='accept') context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_enforce(self): def test_check_host_keys_enforce(self):
# required=false, host_key_checking=enforce # required=false, host_key_checking=enforce
context = self.fake_ssh(check_host_keys='enforce') context = self.stub_ssh(check_host_keys='enforce')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_ignore(self): def test_check_host_keys_ignore(self):
# required=false, host_key_checking=ignore # required=false, host_key_checking=ignore
context = self.fake_ssh(check_host_keys='ignore') context = self.stub_ssh(check_host_keys='ignore')
self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_password_present(self): def test_password_present(self):
# required=true, password is not None # required=true, password is not None
context = self.fake_ssh(check_host_keys='ignore', password='willick') context = self.stub_ssh(check_host_keys='ignore', password='willick')
self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY')) self.assertEquals('1', context.call(os.getenv, 'STDERR_WAS_TTY'))

Loading…
Cancel
Save