[cloud] ASGRetry - add retry on throttling and add backoff to calls in ec2_asg (#24339)

* Use AWSRetry decorator in ec2_asg

* Rebased on stable-2.3

Added tries, delay, and backoff

Adding throttling to the list of errors for AWSRetry

* Create custom retry class for ec2_asg while it still uses boto

* remove changes to ec2.py

* BotoServerError exception has error_code attribute

* add info to the changelog
pull/25782/head
Sloane Hertel 8 years ago committed by Ryan Brown
parent 08f3b83812
commit 6cb0666d0a

@ -76,6 +76,7 @@ Ansible Changes By Release
* 'service' tasks can now use async again, we had lost this capability when changed into an action plugin.
* made any_errors_fatal inheritable from play to task and all other objects in between.
* many small performance improvements in inventory and variable handling and in task execution.
* Added a retry class to the ec2_asg module since customers were running into throttling errors (AWSRetry is a solution for modules using boto3 which isn't applicable here).
### Deprecations
* Specifying --tags (or --skip-tags) multiple times on the command line

@ -230,11 +230,12 @@ EXAMPLES = '''
desired_capacity: 5
region: us-east-1
'''
import re
import time
import logging as log
import traceback
from ansible.module_utils.cloud import CloudRetry
from ansible.module_utils.basic import *
from ansible.module_utils.ec2 import *
log.getLogger('boto').setLevel(log.CRITICAL)
@ -256,6 +257,100 @@ ASG_ATTRIBUTES = ('availability_zones', 'default_cooldown', 'desired_capacity',
INSTANCE_ATTRIBUTES = ('instance_id', 'health_status', 'lifecycle_state', 'launch_config_name')
def _boto_exception_maybe():
"""
Allow for boto exceptions to be retried.
"""
if HAS_BOTO:
return BotoServerError
return type(None)
class ASGRetry(CloudRetry):
""" A retry class for ec2_asg while this module isn't using boto3 """
base_class = _boto_exception_maybe()
@staticmethod
def status_code_from_exception(error):
return error.error_code
@staticmethod
def found(response_code):
retry_on = ['RequestLimitExceeded', 'Unavailable', 'ServiceUnavailable',
'InternalFailure', 'InternalError', 'Throttling']
not_found = re.compile(r'^\w+.NotFound')
if response_code in retry_on or not_found.search(response_code):
return True
else:
return False
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def get_all_groups(asg_connection, group_name):
return asg_connection.get_all_groups(names=[group_name])
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def create_auto_scaling_group(asg_connection, ag):
asg_connection.create_auto_scaling_group(ag)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def get_all_launch_configurations(asg_connection, launch_config_name):
return asg_connection.get_all_launch_configurations(names=[launch_config_name])
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def delete_tags(asg_connection, dead_tags):
asg_connection.delete_tags(dead_tags)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def create_or_update_tags(asg_connection, asg_tags):
asg_connection.create_or_update_tags(asg_tags)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def terminate_instance(asg_connection, instance_id, decrement_capacity):
asg_connection.terminate_instance(instance_id, decrement_capacity=decrement_capacity)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def update(as_group):
as_group.update()
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def put_notification_configuration(as_group, notification_topic, notification_types):
as_group.put_notification_configuration(notification_topic, notification_types)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def delete_notification_configuration(as_group, notification_topic):
as_group.delete_notification_configuration(notification_topic)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def delete(as_group):
as_group.delete()
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def elb_describe_instance_health(elb_connection, lb, instances=None):
return elb_connection.describe_instance_health(lb, instances=instances)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def elb_deregister_instances(elb_connection, lb, instance_id):
elb_connection.deregister_instances(lb, instance_id)
@ASGRetry.backoff(tries=10, delay=10, backoff=1.5)
def ec2_get_all_zones(ec2_connection):
return ec2_connection.get_all_zones()
def enforce_required_arguments(module):
''' As many arguments are not required for autoscale group deletion
they cannot be mandatory arguments for the module, so we enforce
@ -319,7 +414,7 @@ def get_properties(autoscaling_group):
def elb_dreg(asg_connection, module, group_name, instance_id):
region, ec2_url, aws_connect_params = get_aws_connection_info(module)
as_group = asg_connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(asg_connection, group_name)[0]
wait_timeout = module.params.get('wait_timeout')
props = get_properties(as_group)
count = 1
@ -332,14 +427,14 @@ def elb_dreg(asg_connection, module, group_name, instance_id):
return
for lb in as_group.load_balancers:
elb_connection.deregister_instances(lb, instance_id)
elb_deregister_instances(elb_connection, lb, instance_id)
log.debug("De-registering {0} from ELB {1}".format(instance_id, lb))
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time() and count > 0:
count = 0
for lb in as_group.load_balancers:
lb_instances = elb_connection.describe_instance_health(lb)
lb_instances = elb_describe_instance_health(elb_connection, lb)
for i in lb_instances:
if i.instance_id == instance_id and i.state == "InService":
count += 1
@ -353,7 +448,7 @@ def elb_dreg(asg_connection, module, group_name, instance_id):
def elb_healthy(asg_connection, elb_connection, module, group_name):
healthy_instances = set()
as_group = asg_connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(asg_connection, group_name)[0]
props = get_properties(as_group)
# get healthy, inservice instances from ASG
instances = []
@ -366,7 +461,7 @@ def elb_healthy(asg_connection, elb_connection, module, group_name):
# we catch a race condition that sometimes happens if the instance exists in the ASG
# but has not yet show up in the ELB
try:
lb_instances = elb_connection.describe_instance_health(lb, instances=instances)
lb_instances = elb_describe_instance_health(elb_connection, lb, instances)
except boto.exception.BotoServerError as e:
if e.error_code == 'InvalidInstance':
return None
@ -386,7 +481,7 @@ def wait_for_elb(asg_connection, module, group_name):
# if the health_check_type is ELB, we want to query the ELBs directly for instance
# status as to avoid health_check_grace period that is awarded to ASG instances
as_group = asg_connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(asg_connection, group_name)[0]
if as_group.load_balancers and as_group.health_check_type == 'ELB':
log.debug("Waiting for ELB to consider instances healthy.")
@ -444,7 +539,7 @@ def create_autoscaling_group(connection, module):
health_check_type = module.params.get('health_check_type')
default_cooldown = module.params.get('default_cooldown')
wait_for_instances = module.params.get('wait_for_instances')
as_groups = connection.get_all_groups(names=[group_name])
as_groups = get_all_groups(connection, group_name)
wait_timeout = module.params.get('wait_timeout')
termination_policies = module.params.get('termination_policies')
notification_topic = module.params.get('notification_topic')
@ -470,9 +565,9 @@ def create_autoscaling_group(connection, module):
if not as_groups:
if not vpc_zone_identifier and not availability_zones:
availability_zones = module.params['availability_zones'] = [zone.name for zone in ec2_connection.get_all_zones()]
availability_zones = module.params['availability_zones'] = [zone.name for zone in ec2_get_all_zones(ec2_connection)]
enforce_required_arguments(module)
launch_configs = connection.get_all_launch_configurations(names=[launch_config_name])
launch_configs = get_all_launch_configurations(connection, launch_config_name)
if len(launch_configs) == 0:
module.fail_json(msg="No launch config found with name %s" % launch_config_name)
ag = AutoScalingGroup(
@ -493,7 +588,7 @@ def create_autoscaling_group(connection, module):
termination_policies=termination_policies)
try:
connection.create_auto_scaling_group(ag)
create_auto_scaling_group(connection, ag)
suspend_processes(ag, module)
if wait_for_instances:
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
@ -502,7 +597,7 @@ def create_autoscaling_group(connection, module):
if notification_topic:
ag.put_notification_configuration(notification_topic, notification_types)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
asg_properties = get_properties(as_group)
changed = True
return(changed, asg_properties)
@ -551,11 +646,11 @@ def create_autoscaling_group(connection, module):
dead_tags.append(tag)
if dead_tags != []:
connection.delete_tags(dead_tags)
delete_tags(connection, dead_tags)
if have_tags != want_tags:
changed = True
connection.create_or_update_tags(asg_tags)
create_or_update_tags(connection, asg_tags)
# handle loadbalancers separately because None != []
load_balancers = module.params.get('load_balancers') or []
@ -565,13 +660,13 @@ def create_autoscaling_group(connection, module):
if changed:
try:
as_group.update()
update(as_group)
except BotoServerError as e:
module.fail_json(msg="Failed to update Autoscaling Group: %s" % str(e), exception=traceback.format_exc())
if notification_topic:
try:
as_group.put_notification_configuration(notification_topic, notification_types)
put_notification_configuration(as_group, notification_topic, notification_types)
except BotoServerError as e:
module.fail_json(msg="Failed to update Autoscaling Group notifications: %s" % str(e), exception=traceback.format_exc())
@ -579,7 +674,7 @@ def create_autoscaling_group(connection, module):
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_capacity, 'viable_instances')
wait_for_elb(connection, module, group_name)
try:
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
asg_properties = get_properties(as_group)
except BotoServerError as e:
module.fail_json(msg="Failed to read existing Autoscaling Groups: %s" % str(e), exception=traceback.format_exc())
@ -588,35 +683,27 @@ def create_autoscaling_group(connection, module):
def delete_autoscaling_group(connection, module):
group_name = module.params.get('name')
notification_topic = module.params.get('notification_topic')
wait_for_instances = module.params.get('wait_for_instances')
if notification_topic:
ag.delete_notification_configuration(notification_topic)
groups = connection.get_all_groups(names=[group_name])
groups = get_all_groups(connection, group_name)
if groups:
group = groups[0]
if not wait_for_instances:
group.delete(True)
return True
notification_topic = module.params.get('notification_topic')
if notification_topic:
delete_notification_configuration(group, notification_topic)
group.max_size = 0
group.min_size = 0
group.desired_capacity = 0
group.update()
update(group)
instances = True
while instances:
tmp_groups = connection.get_all_groups(names=[group_name])
tmp_groups = get_all_groups(connection, group_name)
if tmp_groups:
tmp_group = tmp_groups[0]
if not tmp_group.instances:
instances = False
time.sleep(10)
group.delete()
while len(connection.get_all_groups(names=[group_name])):
delete(group)
while len(get_all_groups(connection, group_name)):
time.sleep(5)
return True
@ -633,7 +720,7 @@ def update_size(group, max_size, min_size, dc):
group.max_size = max_size
group.min_size = min_size
group.desired_capacity = dc
group.update()
update(group)
def replace(connection, module):
batch_size = module.params.get('replace_batch_size')
@ -645,7 +732,7 @@ def replace(connection, module):
lc_check = module.params.get('lc_check')
replace_instances = module.params.get('replace_instances')
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
props = get_properties(as_group)
instances = props['instances']
@ -668,7 +755,7 @@ def replace(connection, module):
if num_new_inst_needed == 0 and old_instances:
log.debug("No new instances needed, but old instances are present. Removing old instances")
terminate_batch(connection, module, old_instances, instances, True)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
changed = True
return(changed, props)
@ -685,11 +772,11 @@ def replace(connection, module):
# set temporary settings and wait for them to be reached
# This should get overwritten if the number of instances left is less than the batch size.
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
update_size(as_group, max_size + batch_size, min_size + batch_size, desired_capacity + batch_size)
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group.min_size, 'viable_instances')
wait_for_elb(connection, module, group_name)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
instances = props['instances']
if replace_instances:
@ -701,12 +788,12 @@ def replace(connection, module):
wait_for_term_inst(connection, module, term_instances)
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, 'viable_instances')
wait_for_elb(connection, module, group_name)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
if break_early:
log.debug("breaking loop")
break
update_size(as_group, max_size, min_size, desired_capacity)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
asg_properties = get_properties(as_group)
log.debug("Rolling update complete.")
changed=True
@ -763,7 +850,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
decrement_capacity = False
break_loop = False
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
desired_size = as_group.min_size
@ -783,7 +870,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
decrement_capacity = True
if as_group.min_size != min_size:
as_group.min_size = min_size
as_group.update()
update(as_group)
log.debug("Updating minimum size back to original of {0}".format(min_size))
#if are some leftover old instances, but we are already at capacity with new ones
# we don't want to decrement capacity
@ -805,7 +892,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
for instance_id in instances_to_terminate:
elb_dreg(connection, module, group_name, instance_id)
log.debug("terminating instance: {0}".format(instance_id))
connection.terminate_instance(instance_id, decrement_capacity=decrement_capacity)
terminate_instance(connection, instance_id, decrement_capacity)
# we wait to make sure the machines we marked as Unhealthy are
# no longer in the list
@ -819,14 +906,14 @@ def wait_for_term_inst(connection, module, term_instances):
wait_timeout = module.params.get('wait_timeout')
group_name = module.params.get('name')
lc_check = module.params.get('lc_check')
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
count = 1
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time() and count > 0:
log.debug("waiting for instances to terminate")
count = 0
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
instance_facts = props['instance_facts']
instances = ( i for i in instance_facts if i in term_instances)
@ -846,7 +933,7 @@ def wait_for_term_inst(connection, module, term_instances):
def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, prop):
# make sure we have the latest stats after that last loop.
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
# now we make sure that we have enough instances in a viable state
@ -854,7 +941,7 @@ def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size
while wait_timeout > time.time() and desired_size > props[prop]:
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
time.sleep(10)
as_group = connection.get_all_groups(names=[group_name])[0]
as_group = get_all_groups(connection, group_name)[0]
props = get_properties(as_group)
if wait_timeout <= time.time():
# waiting took too long

Loading…
Cancel
Save