You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/tests/ssh_test.py

248 lines
8.3 KiB
Python

import os
import tempfile
import mitogen.ssh
import mitogen.utils
import testlib
import plain_old_module
class StubSshMixin(testlib.RouterMixin):
"""
Mix-in that provides :meth:`stub_ssh` executing the stub 'ssh.py'.
"""
def stub_ssh(self, STUBSSH_MODE=None, **kwargs):
os.environ['STUBSSH_MODE'] = str(STUBSSH_MODE)
try:
return self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('stubs/stub-ssh.py'),
**kwargs
)
finally:
del os.environ['STUBSSH_MODE']
class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self):
context = self.router.ssh(
hostname='hostname',
username='mitogen__has_sudo',
ssh_path=testlib.data_path('stubs/stub-ssh.py'),
)
#context.call(mitogen.utils.log_to_file, '/tmp/log')
#context.call(mitogen.utils.disable_site_packages)
self.assertEqual(3, context.call(plain_old_module.add, 1, 2))
class SshMixin(testlib.DockerMixin):
def test_debug_decoding(self):
# ensure filter_debug_logs() decodes the logged string.
capture = testlib.LogCapturer()
capture.start()
try:
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_debug_level=3,
)
finally:
s = capture.stop()
expect = "%s: debug1: Reading configuration data" % (context.name,)
self.assertIn(expect, s)
def test_bash_permission_denied(self):
# issue #271: only match Permission Denied at start of line.
context = self.docker_ssh(
username='mitogen__permdenied',
password='permdenied_password',
ssh_debug_level=3,
)
def test_stream_name(self):
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
name = 'ssh.%s:%s' % (
self.dockerized_ssh.host,
self.dockerized_ssh.port,
)
self.assertEqual(name, context.name)
def test_via_stream_name(self):
context = self.docker_ssh(
username='mitogen__has_sudo_nopw',
password='has_sudo_nopw_password',
)
sudo = self.router.sudo(via=context)
name = 'ssh.%s:%s.sudo.root' % (
self.dockerized_ssh.host,
self.dockerized_ssh.port,
)
self.assertEqual(name, sudo.name)
def test_password_required(self):
e = self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.docker_ssh(
username='mitogen__has_sudo',
)
)
self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_password_incorrect(self):
e = self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.docker_ssh(
username='mitogen__has_sudo',
password='badpw',
)
)
self.assertEqual(e.args[0], mitogen.ssh.password_incorrect_msg)
def test_password_specified(self):
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
)
self.assertEqual(
'i-am-mitogen-test-docker-image\n',
context.call(plain_old_module.get_sentinel_value),
)
def test_pubkey_required(self):
e = self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.docker_ssh(
username='mitogen__has_sudo_pubkey',
)
)
self.assertEqual(e.args[0], mitogen.ssh.password_required_msg)
def test_pubkey_specified(self):
context = self.docker_ssh(
username='mitogen__has_sudo_pubkey',
identity_file=testlib.data_path('docker/mitogen__has_sudo_pubkey.key'),
)
self.assertEqual(
'i-am-mitogen-test-docker-image\n',
context.call(plain_old_module.get_sentinel_value),
)
def test_enforce_unknown_host_key(self):
fp = tempfile.NamedTemporaryFile()
ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
try:
e = self.assertRaises(mitogen.ssh.HostKeyError,
lambda: self.docker_ssh(
username='mitogen__has_sudo_pubkey',
password='has_sudo_password',
ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
check_host_keys='enforce',
)
)
self.assertEqual(e.args[0], mitogen.ssh.hostkey_failed_msg)
finally:
fp.close()
def test_accept_enforce_host_keys(self):
fp = tempfile.NamedTemporaryFile()
ssh_args = self.docker_ssh_default_kwargs.get('ssh_args', [])
try:
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
check_host_keys='accept',
)
context.shutdown(wait=True)
fp.seek(0)
# Lame test, but we're about to use enforce mode anyway, which
# verifies the file contents.
self.assertGreater(len(fp.read()), 0)
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_args=ssh_args + ['-o', 'UserKnownHostsFile %s' % fp.name],
check_host_keys='enforce',
)
context.shutdown(wait=True)
finally:
fp.close()
for distro_spec in testlib.DISTRO_SPECS.split():
dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
klass_name = 'SshTest%s' % (dockerized_ssh.distro.capitalize(),)
klass = type(
klass_name,
(SshMixin, testlib.TestCase),
{'dockerized_ssh': dockerized_ssh},
)
globals()[klass_name] = klass
class BannerMixin(testlib.DockerMixin):
# Verify the ability to disambiguate random spam appearing in the SSHd's
# login banner from a legitimate password prompt.
def test_verbose_enabled(self):
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_debug_level=3,
)
name = 'ssh.%s:%s' % (
self.dockerized_ssh.host,
self.dockerized_ssh.port,
)
self.assertEqual(name, context.name)
context.shutdown(wait=True)
for distro_spec in testlib.DISTRO_SPECS.split():
dockerized_ssh = testlib.DockerizedSshDaemon(distro_spec)
klass_name = 'BannerTest%s' % (dockerized_ssh.distro.capitalize(),)
klass = type(
klass_name,
(BannerMixin, testlib.TestCase),
{'dockerized_ssh': dockerized_ssh},
)
globals()[klass_name] = klass
class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
def test_classic_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.stub_ssh(STUBSSH_MODE='permdenied_classic'))
def test_openssh_75_prompt(self):
self.assertRaises(mitogen.ssh.PasswordError,
lambda: self.stub_ssh(STUBSSH_MODE='permdenied_75'))
class StubCheckHostKeysTest(StubSshMixin, testlib.TestCase):
def test_check_host_keys_accept(self):
# required=true, host_key_checking=accept
context = self.stub_ssh(STUBSSH_MODE='ask', check_host_keys='accept')
self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_enforce(self):
# required=false, host_key_checking=enforce
context = self.stub_ssh(check_host_keys='enforce')
self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_check_host_keys_ignore(self):
# required=false, host_key_checking=ignore
context = self.stub_ssh(check_host_keys='ignore')
self.assertEqual(None, context.call(os.getenv, 'STDERR_WAS_TTY'))
def test_password_present(self):
# required=true, password is not None
context = self.stub_ssh(check_host_keys='ignore', password='willick')
self.assertEqual('1', context.call(os.getenv, 'STDERR_WAS_TTY'))