add IAM role assumption to aws_ec2 inventory (#41637)

* add IAM role assumption to aws_ec2 inventory
* Ensure inventory._options has necessary option keys populated since the plugin docs parser isn't accessible to unit tests yet
pull/58027/head
Rob 5 years ago committed by Jill R
parent 751134ff17
commit feae35ab7e

@ -26,6 +26,9 @@ DOCUMENTATION = '''
description: Token that ensures this is a source file for the plugin.
required: True
choices: ['aws_ec2']
iam_role_arn:
description: The ARN of the IAM role to assume to perform the inventory lookup. You should still provide AWS
credentials with enough privilege to perform the AssumeRole action.
regions:
description:
- A list of regions in which to describe EC2 instances.
@ -263,6 +266,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.aws_secret_access_key = None
self.aws_access_key_id = None
self.aws_security_token = None
self.iam_role_arn = None
def _compile_values(self, obj, attr):
'''
@ -334,6 +338,26 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
raise AnsibleError("Insufficient credentials found: %s" % to_native(e))
return connection
def _boto3_assume_role(self, credentials, region):
"""
Assume an IAM role passed by iam_role_arn parameter
:return: a dict containing the credentials of the assumed role
"""
iam_role_arn = self.iam_role_arn
try:
sts_connection = boto3.session.Session(profile_name=self.boto_profile).client('sts', region, **credentials)
sts_session = sts_connection.assume_role(RoleArn=iam_role_arn, RoleSessionName='ansible_aws_ec2_dynamic_inventory')
return dict(
aws_access_key_id=sts_session['Credentials']['AccessKeyId'],
aws_secret_access_key=sts_session['Credentials']['SecretAccessKey'],
aws_session_token=sts_session['Credentials']['SessionToken']
)
except botocore.exceptions.ClientError as e:
raise AnsibleError("Unable to assume IAM role: %s" % to_native(e))
def _boto3_conn(self, regions):
'''
:param regions: A list of regions to create a boto3 client
@ -342,6 +366,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
'''
credentials = self._get_credentials()
iam_role_arn = self.iam_role_arn
if not regions:
try:
@ -364,6 +389,20 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
for region in regions:
connection = self._get_connection(credentials, region)
try:
if iam_role_arn is not None:
assumed_credentials = self._boto3_assume_role(credentials, region)
else:
assumed_credentials = credentials
connection = boto3.session.Session(profile_name=self.boto_profile).client('ec2', region, **assumed_credentials)
except (botocore.exceptions.ProfileNotFound, botocore.exceptions.PartialCredentialsError) as e:
if self.boto_profile:
try:
connection = boto3.session.Session(profile_name=self.boto_profile).client('ec2', region)
except (botocore.exceptions.ProfileNotFound, botocore.exceptions.PartialCredentialsError) as e:
raise AnsibleError("Insufficient credentials found: %s" % to_native(e))
else:
raise AnsibleError("Insufficient credentials found: %s" % to_native(e))
yield connection, region
def _get_instances_by_region(self, regions, filters, strict_permissions):
@ -533,6 +572,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.aws_access_key_id = self.get_option('aws_access_key')
self.aws_secret_access_key = self.get_option('aws_secret_key')
self.aws_security_token = self.get_option('aws_security_token')
self.iam_role_arn = self.get_option('iam_role_arn')
if not self.boto_profile and not (self.aws_access_key_id and self.aws_secret_access_key):
session = botocore.session.get_session()

@ -132,11 +132,12 @@ def test_boto3_conn(inventory):
inventory._options = {"aws_profile": "first_precedence",
"aws_access_key": "test_access_key",
"aws_secret_key": "test_secret_key",
"aws_security_token": "test_security_token"}
"aws_security_token": "test_security_token",
"iam_role_arn": None}
inventory._set_credentials()
with pytest.raises(AnsibleError) as error_message:
for connection, region in inventory._boto3_conn(regions=['us-east-1']):
assert error_message == "Insufficient credentials found."
assert "Insufficient credentials found" in error_message
def test_get_hostname_default(inventory):
@ -154,13 +155,15 @@ def test_set_credentials(inventory):
inventory._options = {'aws_access_key': 'test_access_key',
'aws_secret_key': 'test_secret_key',
'aws_security_token': 'test_security_token',
'aws_profile': 'test_profile'}
'aws_profile': 'test_profile',
'iam_role_arn': 'arn:aws:iam::112233445566:role/test-role'}
inventory._set_credentials()
assert inventory.boto_profile == "test_profile"
assert inventory.aws_access_key_id == "test_access_key"
assert inventory.aws_secret_access_key == "test_secret_key"
assert inventory.aws_security_token == "test_security_token"
assert inventory.iam_role_arn == "arn:aws:iam::112233445566:role/test-role"
def test_insufficient_credentials(inventory):
@ -168,11 +171,12 @@ def test_insufficient_credentials(inventory):
'aws_access_key': None,
'aws_secret_key': None,
'aws_security_token': None,
'aws_profile': None
'aws_profile': None,
'iam_role_arn': None
}
with pytest.raises(AnsibleError) as error_message:
inventory._set_credentials()
assert "Insufficient boto credentials found" in error_message
assert "Insufficient credentials found" in error_message
def test_verify_file_bad_config(inventory):

Loading…
Cancel
Save