ssh connection avoid parsiing own debug (#76732)

- Avoids false positives on become strings being echoed back 
   by ssh cli itself
  - added test for debug lines
  - also simplified some of existing test code
pull/76760/head
Brian Coca 2 years ago committed by GitHub
parent 9142be2f6c
commit 0ff80a15ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
bugfixes:
- ssh connection avoid parsing ssh cli debug lines as they can match expected output at high verbosities.

@ -388,6 +388,7 @@ b_NOT_SSH_ERRORS = (b'Traceback (most recent call last):', # Python-2.6 when th
)
SSHPASS_AVAILABLE = None
SSH_DEBUG = re.compile(r'^debug\d+: .*')
class AnsibleControlPersistBrokenPipeError(AnsibleError):
@ -837,7 +838,10 @@ class Connection(ConnectionBase):
suppress_output = False
# display.debug("Examining line (source=%s, state=%s): '%s'" % (source, state, display_line))
if self.become.expect_prompt() and self.become.check_password_prompt(b_line):
if SSH_DEBUG.match(display_line):
# skip lines from ssh debug output to avoid false matches
pass
elif self.become.expect_prompt() and self.become.check_password_prompt(b_line):
display.debug(u"become_prompt: (source=%s, state=%s): '%s'" % (source, state, display_line))
self._flags['become_prompt'] = True
suppress_output = True

@ -102,6 +102,7 @@ class TestConnectionBaseClass(unittest.TestCase):
def test_plugins_connection_ssh__examine_output(self):
pc = PlayContext()
new_stdin = StringIO()
become_success_token = b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz'
conn = connection_loader.get('ssh', pc, new_stdin)
conn.set_become_plugin(become_loader.get('sudo'))
@ -112,24 +113,16 @@ class TestConnectionBaseClass(unittest.TestCase):
conn.become.check_missing_password = MagicMock()
def _check_password_prompt(line):
if b'foo' in line:
return True
return False
return b'foo' in line
def _check_become_success(line):
if b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' in line:
return True
return False
return become_success_token in line
def _check_incorrect_password(line):
if b'incorrect password' in line:
return True
return False
return b'incorrect password' in line
def _check_missing_password(line):
if b'bad password' in line:
return True
return False
return b'bad password' in line
# test examining output for prompt
conn._flags = dict(
@ -172,9 +165,9 @@ class TestConnectionBaseClass(unittest.TestCase):
pc.prompt = False
conn.become.prompt = False
pc.success_key = u'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz'
conn.become.success = u'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz'
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nBECOME-SUCCESS-abcdefghijklmnopqrstuvxyz\nline 3\n', False)
pc.success_key = str(become_success_token)
conn.become.success = str(become_success_token)
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\n%s\nline 3\n' % become_success_token, False)
self.assertEqual(output, b'line 1\nline 2\nline 3\n')
self.assertEqual(unprocessed, b'')
self.assertFalse(conn._flags['become_prompt'])
@ -182,6 +175,23 @@ class TestConnectionBaseClass(unittest.TestCase):
self.assertFalse(conn._flags['become_error'])
self.assertFalse(conn._flags['become_nopasswd_error'])
# test we dont detect become success from ssh debug: lines
conn._flags = dict(
become_prompt=False,
become_success=False,
become_error=False,
become_nopasswd_error=False,
)
pc.prompt = False
conn.become.prompt = True
pc.success_key = str(become_success_token)
conn.become.success = str(become_success_token)
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token, False)
self.assertEqual(output, b'line 1\nline 2\ndebug1: %s\nline 3\n' % become_success_token)
self.assertEqual(unprocessed, b'')
self.assertFalse(conn._flags['become_success'])
# test examining output for become failure
conn._flags = dict(
become_prompt=False,

Loading…
Cancel
Save