Add ability to unlock 1Password vault to lookup plugins (#44923)

* Add ability to use login to 1Password vault to 1Password lookups

* Adjust unit tests

* Add changelog
pull/44929/head
Sam Doran 6 years ago committed by GitHub
parent ff654ccfe8
commit 8cd8d17980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
minor_changes:
- onepassword/onepassword_raw - accept subdomain and vault_password to allow Ansible to unlock 1Password vaults

@ -16,6 +16,7 @@ DOCUMENTATION = """
author: author:
- Scott Buchanan <sbuchanan@ri.pn> - Scott Buchanan <sbuchanan@ri.pn>
- Andrew Zenk <azenk@umn.edu> - Andrew Zenk <azenk@umn.edu>
- Sam Doran<sdoran@redhat.com>
version_added: "2.6" version_added: "2.6"
requirements: requirements:
- C(op) 1Password command line utility. See U(https://support.1password.com/command-line/) - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
@ -25,7 +26,7 @@ DOCUMENTATION = """
- onepassword wraps the C(op) command line utility to fetch specific field values from 1Password - onepassword wraps the C(op) command line utility to fetch specific field values from 1Password
options: options:
_terms: _terms:
description: identifier(s) (UUID, name or domain; case-insensitive) of item(s) to retrieve description: identifier(s) (UUID, name, or subdomain; case-insensitive) of item(s) to retrieve
required: True required: True
field: field:
description: field to return from each matching item (case-insensitive) description: field to return from each matching item (case-insensitive)
@ -33,23 +34,35 @@ DOCUMENTATION = """
section: section:
description: item section containing the field to retrieve (case-insensitive); if absent will return first match from any section description: item section containing the field to retrieve (case-insensitive); if absent will return first match from any section
default: None default: None
subdomain:
description: The 1Password subdomain to authenticate against.
default: None
version_added: '2.7'
vault: vault:
description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults
default: None default: None
vault_password:
description: The password used to unlock the specified vault.
default: None
version_added: '2.7'
""" """
EXAMPLES = """ EXAMPLES = """
- name: "retrieve password for KITT" - name: Retrieve password for KITT
debug: debug:
msg: "{{ lookup('onepassword', 'KITT') }}" var: lookup('onepassword', 'KITT')
- name: "retrieve password for Wintermute" - name: Retrieve password for Wintermute
debug: debug:
msg: "{{ lookup('onepassword', 'Tessier-Ashpool', section='Wintermute') }}" var: lookup('onepassword', 'Tessier-Ashpool', section='Wintermute')
- name: "retrieve username for HAL" - name: Retrieve username for HAL
debug: debug:
msg: "{{ lookup('onepassword', 'HAL 9000', field='username', vault='Discovery') }}" var: lookup('onepassword', 'HAL 9000', field='username', vault='Discovery')
- name: Retrieve password for HAL when not signed in to 1Password
debug:
var: lookup('onepassword', 'HAL 9000', subdomain='Discovery', vault_password='DmbslfLvasjdl')
""" """
RETURN = """ RETURN = """
@ -64,45 +77,64 @@ from subprocess import Popen, PIPE
from ansible.plugins.lookup import LookupBase from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleLookupError from ansible.errors import AnsibleLookupError
from ansible.module_utils._text import to_bytes
class OnePass(object): class OnePass(object):
def __init__(self, path='op'): def __init__(self, path='op'):
self._cli_path = path self._cli_path = path
self._logged_in = False
self._token = None
self._subdomain = None
self._vault_password = None
@property @property
def cli_path(self): def cli_path(self):
return self._cli_path return self._cli_path
def get_token(self):
if not self._subdomain and not self._vault_password:
raise AnsibleLookupError('Both subdomain and password are required when logging in.')
args = ['signin', self._subdomain, '--output=raw']
rc, out, err = self._run(args, command_input=to_bytes(self._vault_password))
self._token = out.strip()
def assert_logged_in(self): def assert_logged_in(self):
try: try:
self._run(["get", "account"]) rc, out, err = self._run(['get', 'account'], ignore_errors=True)
if rc != 1:
self._logged_in = True
if not self._logged_in:
self.get_token()
except OSError as e: except OSError as e:
if e.errno == errno.ENOENT: if e.errno == errno.ENOENT:
raise AnsibleLookupError("1Password CLI tool not installed in path on control machine") raise AnsibleLookupError("1Password CLI tool not installed in path on control machine")
raise e raise e
except AnsibleLookupError: except AnsibleLookupError:
raise AnsibleLookupError("Not logged into 1Password: please run 'op signin' first") raise AnsibleLookupError("Not logged into 1Password: please run 'op signin' first, or provide both subdomain and vault_password.")
def get_raw(self, item_id, vault=None): def get_raw(self, item_id, vault=None):
args = ["get", "item", item_id] args = ["get", "item", item_id]
if vault is not None: if vault is not None:
args += ['--vault={0}'.format(vault)] args += ['--vault={0}'.format(vault)]
output, dummy = self._run(args) if not self._logged_in:
args += [to_bytes('--session=') + self._token]
rc, output, dummy = self._run(args)
return output return output
def get_field(self, item_id, field, section=None, vault=None): def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault) output = self.get_raw(item_id, vault)
return self._parse_field(output, field, section) if output != '' else '' return self._parse_field(output, field, section) if output != '' else ''
def _run(self, args, expected_rc=0): def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE) command = [self.cli_path] + args
out, err = p.communicate() p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE)
out, err = p.communicate(input=command_input)
rc = p.wait() rc = p.wait()
if rc != expected_rc: if not ignore_errors and rc != expected_rc:
raise AnsibleLookupError(err) raise AnsibleLookupError(err)
return out, err return rc, out, err
def _parse_field(self, data_json, field_name, section_title=None): def _parse_field(self, data_json, field_name, section_title=None):
data = json.loads(data_json) data = json.loads(data_json)
@ -124,11 +156,13 @@ class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
op = OnePass() op = OnePass()
op.assert_logged_in()
field = kwargs.get('field', 'password') field = kwargs.get('field', 'password')
section = kwargs.get('section') section = kwargs.get('section')
vault = kwargs.get('vault') vault = kwargs.get('vault')
op._subdomain = kwargs.get('subdomain')
op._vault_password = kwargs.get('vault_password')
op.assert_logged_in()
values = [] values = []
for term in terms: for term in terms:

@ -16,6 +16,7 @@ DOCUMENTATION = """
author: author:
- Scott Buchanan <sbuchanan@ri.pn> - Scott Buchanan <sbuchanan@ri.pn>
- Andrew Zenk <azenk@umn.edu> - Andrew Zenk <azenk@umn.edu>
- Sam Doran <sdoran@redhat.com>
version_added: "2.6" version_added: "2.6"
requirements: requirements:
- C(op) 1Password command line utility. See U(https://support.1password.com/command-line/) - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
@ -27,15 +28,27 @@ DOCUMENTATION = """
_terms: _terms:
description: identifier(s) (UUID, name, or domain; case-insensitive) of item(s) to retrieve description: identifier(s) (UUID, name, or domain; case-insensitive) of item(s) to retrieve
required: True required: True
subdomain:
description: The 1Password subdomain to authenticate against.
default: None
version_added: '2.7'
vault: vault:
description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults
default: None default: None
vault_password:
description: The password used to unlock the specified vault.
default: None
version_added: '2.7'
""" """
EXAMPLES = """ EXAMPLES = """
- name: "retrieve all data about Wintermute" - name: Retrieve all data about Wintermute
debug:
var: lookup('onepassword_raw', 'Wintermute')
- name: Retrieve all data about Wintermute when not signed in to 1Password
debug: debug:
msg: "{{ lookup('onepassword_raw', 'Wintermute') }}" var: lookup('onepassword_raw', 'Wintermute', subdomain='Turing', vault_password='DmbslfLvasjdl')
""" """
RETURN = """ RETURN = """
@ -54,9 +67,11 @@ class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
op = OnePass() op = OnePass()
op.assert_logged_in()
vault = kwargs.get('vault') vault = kwargs.get('vault')
op._subdomain = kwargs.get('subdomain')
op._vault_password = kwargs.get('vault_password')
op.assert_logged_in()
values = [] values = []
for term in terms: for term in terms:

@ -153,7 +153,7 @@ class MockOnePass(OnePass):
if key in match_fields: if key in match_fields:
return entry['output'] return entry['output']
def _run(self, args, expected_rc=0): def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
parser = ArgumentParser() parser = ArgumentParser()
command_parser = parser.add_subparsers(dest='command') command_parser = parser.add_subparsers(dest='command')
@ -174,7 +174,7 @@ class MockOnePass(OnePass):
if error != '': if error != '':
now = datetime.date.today() now = datetime.date.today()
error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error) error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error)
return output, error return rc, output, error
if args.command == 'get': if args.command == 'get':
if self._mock_logged_out: if self._mock_logged_out:
@ -233,38 +233,45 @@ class TestOnePass(unittest.TestCase):
def test_onepassword_get(self): def test_onepassword_get(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
query_generator = get_mock_query_generator() query_generator = get_mock_query_generator()
for dummy, query, dummy, field_name, field_value in query_generator: for dummy, query, dummy, field_name, field_value in query_generator:
self.assertEqual(field_value, op.get_field(query, field_name)) self.assertEqual(field_value, op.get_field(query, field_name))
def test_onepassword_get_raw(self): def test_onepassword_get_raw(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
for entry in MOCK_ENTRIES: for entry in MOCK_ENTRIES:
for query in entry['queries']: for query in entry['queries']:
self.assertEqual(json.dumps(entry['output']), op.get_raw(query)) self.assertEqual(json.dumps(entry['output']), op.get_raw(query))
def test_onepassword_get_not_found(self): def test_onepassword_get_not_found(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
self.assertEqual('', op.get_field('a fake query', 'a fake field')) self.assertEqual('', op.get_field('a fake query', 'a fake field'))
def test_onepassword_get_with_section(self): def test_onepassword_get_with_section(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
dummy, query, section_title, field_name, field_value = get_one_mock_query() dummy, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual(field_value, op.get_field(query, field_name, section=section_title)) self.assertEqual(field_value, op.get_field(query, field_name, section=section_title))
def test_onepassword_get_with_vault(self): def test_onepassword_get_with_vault(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
entry, query, dummy, field_name, field_value = get_one_mock_query() entry, query, dummy, field_name, field_value = get_one_mock_query()
for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]: for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]:
self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query)) self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query))
def test_onepassword_get_with_wrong_vault(self): def test_onepassword_get_with_wrong_vault(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
dummy, query, dummy, field_name, dummy = get_one_mock_query() dummy, query, dummy, field_name, dummy = get_one_mock_query()
self.assertEqual('', op.get_field(query, field_name, vault='a fake vault')) self.assertEqual('', op.get_field(query, field_name, vault='a fake vault'))
def test_onepassword_get_diff_case(self): def test_onepassword_get_diff_case(self):
op = MockOnePass() op = MockOnePass()
op._logged_in = True
entry, query, section_title, field_name, field_value = get_one_mock_query() entry, query, section_title, field_name, field_value = get_one_mock_query()
self.assertEqual( self.assertEqual(
field_value, field_value,

Loading…
Cancel
Save