diff --git a/CHANGELOG.md b/CHANGELOG.md index 10f963f56c1..4ec27297667 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Ansible Changes By Release * 'service' tasks can now use async again, we had lost this capability when changed into an action plugin. * made any_errors_fatal inheritable from play to task and all other objects in between. * many small performance improvements in inventory and variable handling and in task execution. +* Added a retry class to the ec2_asg module since customers were running into throttling errors (AWSRetry is a solution for modules using boto3 which isn't applicable here). ### Deprecations * Specifying --tags (or --skip-tags) multiple times on the command line diff --git a/lib/ansible/modules/cloud/amazon/ec2_asg.py b/lib/ansible/modules/cloud/amazon/ec2_asg.py index a396d3f5cd4..7b8e91ca6d0 100644 --- a/lib/ansible/modules/cloud/amazon/ec2_asg.py +++ b/lib/ansible/modules/cloud/amazon/ec2_asg.py @@ -230,11 +230,12 @@ EXAMPLES = ''' desired_capacity: 5 region: us-east-1 ''' - +import re import time import logging as log import traceback +from ansible.module_utils.cloud import CloudRetry from ansible.module_utils.basic import * from ansible.module_utils.ec2 import * log.getLogger('boto').setLevel(log.CRITICAL) @@ -256,6 +257,100 @@ ASG_ATTRIBUTES = ('availability_zones', 'default_cooldown', 'desired_capacity', INSTANCE_ATTRIBUTES = ('instance_id', 'health_status', 'lifecycle_state', 'launch_config_name') + +def _boto_exception_maybe(): + """ + Allow for boto exceptions to be retried. + """ + if HAS_BOTO: + return BotoServerError + return type(None) + + +class ASGRetry(CloudRetry): + """ A retry class for ec2_asg while this module isn't using boto3 """ + base_class = _boto_exception_maybe() + + @staticmethod + def status_code_from_exception(error): + return error.error_code + + @staticmethod + def found(response_code): + retry_on = ['RequestLimitExceeded', 'Unavailable', 'ServiceUnavailable', + 'InternalFailure', 'InternalError', 'Throttling'] + not_found = re.compile(r'^\w+.NotFound') + if response_code in retry_on or not_found.search(response_code): + return True + else: + return False + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def get_all_groups(asg_connection, group_name): + return asg_connection.get_all_groups(names=[group_name]) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def create_auto_scaling_group(asg_connection, ag): + asg_connection.create_auto_scaling_group(ag) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def get_all_launch_configurations(asg_connection, launch_config_name): + return asg_connection.get_all_launch_configurations(names=[launch_config_name]) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def delete_tags(asg_connection, dead_tags): + asg_connection.delete_tags(dead_tags) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def create_or_update_tags(asg_connection, asg_tags): + asg_connection.create_or_update_tags(asg_tags) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def terminate_instance(asg_connection, instance_id, decrement_capacity): + asg_connection.terminate_instance(instance_id, decrement_capacity=decrement_capacity) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def update(as_group): + as_group.update() + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def put_notification_configuration(as_group, notification_topic, notification_types): + as_group.put_notification_configuration(notification_topic, notification_types) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def delete_notification_configuration(as_group, notification_topic): + as_group.delete_notification_configuration(notification_topic) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def delete(as_group): + as_group.delete() + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def elb_describe_instance_health(elb_connection, lb, instances=None): + return elb_connection.describe_instance_health(lb, instances=instances) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def elb_deregister_instances(elb_connection, lb, instance_id): + elb_connection.deregister_instances(lb, instance_id) + + +@ASGRetry.backoff(tries=10, delay=10, backoff=1.5) +def ec2_get_all_zones(ec2_connection): + return ec2_connection.get_all_zones() + + def enforce_required_arguments(module): ''' As many arguments are not required for autoscale group deletion they cannot be mandatory arguments for the module, so we enforce @@ -319,7 +414,7 @@ def get_properties(autoscaling_group): def elb_dreg(asg_connection, module, group_name, instance_id): region, ec2_url, aws_connect_params = get_aws_connection_info(module) - as_group = asg_connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(asg_connection, group_name)[0] wait_timeout = module.params.get('wait_timeout') props = get_properties(as_group) count = 1 @@ -332,14 +427,14 @@ def elb_dreg(asg_connection, module, group_name, instance_id): return for lb in as_group.load_balancers: - elb_connection.deregister_instances(lb, instance_id) + elb_deregister_instances(elb_connection, lb, instance_id) log.debug("De-registering {0} from ELB {1}".format(instance_id, lb)) wait_timeout = time.time() + wait_timeout while wait_timeout > time.time() and count > 0: count = 0 for lb in as_group.load_balancers: - lb_instances = elb_connection.describe_instance_health(lb) + lb_instances = elb_describe_instance_health(elb_connection, lb) for i in lb_instances: if i.instance_id == instance_id and i.state == "InService": count += 1 @@ -353,7 +448,7 @@ def elb_dreg(asg_connection, module, group_name, instance_id): def elb_healthy(asg_connection, elb_connection, module, group_name): healthy_instances = set() - as_group = asg_connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(asg_connection, group_name)[0] props = get_properties(as_group) # get healthy, inservice instances from ASG instances = [] @@ -366,7 +461,7 @@ def elb_healthy(asg_connection, elb_connection, module, group_name): # we catch a race condition that sometimes happens if the instance exists in the ASG # but has not yet show up in the ELB try: - lb_instances = elb_connection.describe_instance_health(lb, instances=instances) + lb_instances = elb_describe_instance_health(elb_connection, lb, instances) except boto.exception.BotoServerError as e: if e.error_code == 'InvalidInstance': return None @@ -386,7 +481,7 @@ def wait_for_elb(asg_connection, module, group_name): # if the health_check_type is ELB, we want to query the ELBs directly for instance # status as to avoid health_check_grace period that is awarded to ASG instances - as_group = asg_connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(asg_connection, group_name)[0] if as_group.load_balancers and as_group.health_check_type == 'ELB': log.debug("Waiting for ELB to consider instances healthy.") @@ -444,7 +539,7 @@ def create_autoscaling_group(connection, module): health_check_type = module.params.get('health_check_type') default_cooldown = module.params.get('default_cooldown') wait_for_instances = module.params.get('wait_for_instances') - as_groups = connection.get_all_groups(names=[group_name]) + as_groups = get_all_groups(connection, group_name) wait_timeout = module.params.get('wait_timeout') termination_policies = module.params.get('termination_policies') notification_topic = module.params.get('notification_topic') @@ -470,9 +565,9 @@ def create_autoscaling_group(connection, module): if not as_groups: if not vpc_zone_identifier and not availability_zones: - availability_zones = module.params['availability_zones'] = [zone.name for zone in ec2_connection.get_all_zones()] + availability_zones = module.params['availability_zones'] = [zone.name for zone in ec2_get_all_zones(ec2_connection)] enforce_required_arguments(module) - launch_configs = connection.get_all_launch_configurations(names=[launch_config_name]) + launch_configs = get_all_launch_configurations(connection, launch_config_name) if len(launch_configs) == 0: module.fail_json(msg="No launch config found with name %s" % launch_config_name) ag = AutoScalingGroup( @@ -493,7 +588,7 @@ def create_autoscaling_group(connection, module): termination_policies=termination_policies) try: - connection.create_auto_scaling_group(ag) + create_auto_scaling_group(connection, ag) suspend_processes(ag, module) if wait_for_instances: wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances') @@ -502,7 +597,7 @@ def create_autoscaling_group(connection, module): if notification_topic: ag.put_notification_configuration(notification_topic, notification_types) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] asg_properties = get_properties(as_group) changed = True return(changed, asg_properties) @@ -551,11 +646,11 @@ def create_autoscaling_group(connection, module): dead_tags.append(tag) if dead_tags != []: - connection.delete_tags(dead_tags) + delete_tags(connection, dead_tags) if have_tags != want_tags: changed = True - connection.create_or_update_tags(asg_tags) + create_or_update_tags(connection, asg_tags) # handle loadbalancers separately because None != [] load_balancers = module.params.get('load_balancers') or [] @@ -565,13 +660,13 @@ def create_autoscaling_group(connection, module): if changed: try: - as_group.update() + update(as_group) except BotoServerError as e: module.fail_json(msg="Failed to update Autoscaling Group: %s" % str(e), exception=traceback.format_exc()) if notification_topic: try: - as_group.put_notification_configuration(notification_topic, notification_types) + put_notification_configuration(as_group, notification_topic, notification_types) except BotoServerError as e: module.fail_json(msg="Failed to update Autoscaling Group notifications: %s" % str(e), exception=traceback.format_exc()) @@ -579,7 +674,7 @@ def create_autoscaling_group(connection, module): wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances') wait_for_elb(connection, module, group_name) try: - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] asg_properties = get_properties(as_group) except BotoServerError as e: module.fail_json(msg="Failed to read existing Autoscaling Groups: %s" % str(e), exception=traceback.format_exc()) @@ -588,35 +683,27 @@ def create_autoscaling_group(connection, module): def delete_autoscaling_group(connection, module): group_name = module.params.get('name') - notification_topic = module.params.get('notification_topic') - wait_for_instances = module.params.get('wait_for_instances') - - if notification_topic: - ag.delete_notification_configuration(notification_topic) - - groups = connection.get_all_groups(names=[group_name]) + groups = get_all_groups(connection, group_name) if groups: group = groups[0] - - if not wait_for_instances: - group.delete(True) - return True - + notification_topic = module.params.get('notification_topic') + if notification_topic: + delete_notification_configuration(group, notification_topic) group.max_size = 0 group.min_size = 0 group.desired_capacity = 0 - group.update() + update(group) instances = True while instances: - tmp_groups = connection.get_all_groups(names=[group_name]) + tmp_groups = get_all_groups(connection, group_name) if tmp_groups: tmp_group = tmp_groups[0] if not tmp_group.instances: instances = False time.sleep(10) - group.delete() - while len(connection.get_all_groups(names=[group_name])): + delete(group) + while len(get_all_groups(connection, group_name)): time.sleep(5) return True @@ -633,7 +720,7 @@ def update_size(group, max_size, min_size, dc): group.max_size = max_size group.min_size = min_size group.desired_capacity = dc - group.update() + update(group) def replace(connection, module): batch_size = module.params.get('replace_batch_size') @@ -645,7 +732,7 @@ def replace(connection, module): lc_check = module.params.get('lc_check') replace_instances = module.params.get('replace_instances') - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances') props = get_properties(as_group) instances = props['instances'] @@ -668,7 +755,7 @@ def replace(connection, module): if num_new_inst_needed == 0 and old_instances: log.debug("No new instances needed, but old instances are present. Removing old instances") terminate_batch(connection, module, old_instances, instances, True) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) changed = True return(changed, props) @@ -685,11 +772,11 @@ def replace(connection, module): # set temporary settings and wait for them to be reached # This should get overwritten if the number of instances left is less than the batch size. - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] update_size(as_group, max_size + batch_size, min_size + batch_size, desired_capacity + batch_size) wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances') wait_for_elb(connection, module, group_name) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) instances = props['instances'] if replace_instances: @@ -701,12 +788,12 @@ def replace(connection, module): wait_for_term_inst(connection, module, term_instances) wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, 'viable_instances') wait_for_elb(connection, module, group_name) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] if break_early: log.debug("breaking loop") break update_size(as_group, max_size, min_size, desired_capacity) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] asg_properties = get_properties(as_group) log.debug("Rolling update complete.") changed=True @@ -763,7 +850,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le decrement_capacity = False break_loop = False - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) desired_size = as_group.min_size @@ -783,7 +870,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le decrement_capacity = True if as_group.min_size != min_size: as_group.min_size = min_size - as_group.update() + update(as_group) log.debug("Updating minimum size back to original of {0}".format(min_size)) #if are some leftover old instances, but we are already at capacity with new ones # we don't want to decrement capacity @@ -805,7 +892,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le for instance_id in instances_to_terminate: elb_dreg(connection, module, group_name, instance_id) log.debug("terminating instance: {0}".format(instance_id)) - connection.terminate_instance(instance_id, decrement_capacity=decrement_capacity) + terminate_instance(connection, instance_id, decrement_capacity) # we wait to make sure the machines we marked as Unhealthy are # no longer in the list @@ -819,14 +906,14 @@ def wait_for_term_inst(connection, module, term_instances): wait_timeout = module.params.get('wait_timeout') group_name = module.params.get('name') lc_check = module.params.get('lc_check') - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) count = 1 wait_timeout = time.time() + wait_timeout while wait_timeout > time.time() and count > 0: log.debug("waiting for instances to terminate") count = 0 - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) instance_facts = props['instance_facts'] instances = ( i for i in instance_facts if i in term_instances) @@ -846,7 +933,7 @@ def wait_for_term_inst(connection, module, term_instances): def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, prop): # make sure we have the latest stats after that last loop. - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop])) # now we make sure that we have enough instances in a viable state @@ -854,7 +941,7 @@ def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size while wait_timeout > time.time() and desired_size > props[prop]: log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop])) time.sleep(10) - as_group = connection.get_all_groups(names=[group_name])[0] + as_group = get_all_groups(connection, group_name)[0] props = get_properties(as_group) if wait_timeout <= time.time(): # waiting took too long