diff --git a/changelogs/fragments/ssh_debug_noparse.yml b/changelogs/fragments/ssh_debug_noparse.yml new file mode 100644 index 00000000000..fa4a148ea15 --- /dev/null +++ b/changelogs/fragments/ssh_debug_noparse.yml @@ -0,0 +1,2 @@ +bugfixes: + - ssh connection avoid parsing ssh cli debug lines as they can match expected output at high verbosities. diff --git a/lib/ansible/plugins/connection/ssh.py b/lib/ansible/plugins/connection/ssh.py index f888a7d9712..87d80794c23 100644 --- a/lib/ansible/plugins/connection/ssh.py +++ b/lib/ansible/plugins/connection/ssh.py @@ -388,6 +388,7 @@ b_NOT_SSH_ERRORS = (b'Traceback (most recent call last):', # Python-2.6 when th ) SSHPASS_AVAILABLE = None +SSH_DEBUG = re.compile(r'^debug\d+: .*') class AnsibleControlPersistBrokenPipeError(AnsibleError): @@ -837,7 +838,10 @@ class Connection(ConnectionBase): suppress_output = False # display.debug("Examining line (source=%s, state=%s): '%s'" % (source, state, display_line)) - if self.become.expect_prompt() and self.become.check_password_prompt(b_line): + if SSH_DEBUG.match(display_line): + # skip lines from ssh debug output to avoid false matches + pass + elif self.become.expect_prompt() and self.become.check_password_prompt(b_line): display.debug(u"become_prompt: (source=%s, state=%s): '%s'" % (source, state, display_line)) self._flags['become_prompt'] = True suppress_output = True diff --git a/test/units/plugins/connection/test_ssh.py b/test/units/plugins/connection/test_ssh.py index d693313ff01..9b3e3c9dd75 100644 --- a/test/units/plugins/connection/test_ssh.py +++ b/test/units/plugins/connection/test_ssh.py @@ -102,6 +102,7 @@ class TestConnectionBaseClass(unittest.TestCase): def test_plugins_connection_ssh__examine_output(self): pc = PlayContext() new_stdin = StringIO() + become_success_token = b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' conn = connection_loader.get('ssh', pc, new_stdin) conn.set_become_plugin(become_loader.get('sudo')) @@ -112,24 +113,16 @@ class TestConnectionBaseClass(unittest.TestCase): conn.become.check_missing_password = MagicMock() def _check_password_prompt(line): - if b'foo' in line: - return True - return False + return b'foo' in line def _check_become_success(line): - if b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' in line: - return True - return False + return become_success_token in line def _check_incorrect_password(line): - if b'incorrect password' in line: - return True - return False + return b'incorrect password' in line def _check_missing_password(line): - if b'bad password' in line: - return True - return False + return b'bad password' in line # test examining output for prompt conn._flags = dict( @@ -172,9 +165,9 @@ class TestConnectionBaseClass(unittest.TestCase): pc.prompt = False conn.become.prompt = False - pc.success_key = u'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' - conn.become.success = u'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' - output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nBECOME-SUCCESS-abcdefghijklmnopqrstuvxyz\nline 3\n', False) + pc.success_key = str(become_success_token) + conn.become.success = str(become_success_token) + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\n%s\nline 3\n' % become_success_token, False) self.assertEqual(output, b'line 1\nline 2\nline 3\n') self.assertEqual(unprocessed, b'') self.assertFalse(conn._flags['become_prompt']) @@ -182,6 +175,23 @@ class TestConnectionBaseClass(unittest.TestCase): self.assertFalse(conn._flags['become_error']) self.assertFalse(conn._flags['become_nopasswd_error']) + # test we dont detect become success from ssh debug: lines + conn._flags = dict( + become_prompt=False, + become_success=False, + become_error=False, + become_nopasswd_error=False, + ) + + pc.prompt = False + conn.become.prompt = True + pc.success_key = str(become_success_token) + conn.become.success = str(become_success_token) + output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token, False) + self.assertEqual(output, b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token) + self.assertEqual(unprocessed, b'') + self.assertFalse(conn._flags['become_success']) + # test examining output for become failure conn._flags = dict( become_prompt=False,