[cloud] Make ec2_asg more resilient using AWSRetry around boto3 calls (#27598)

* Add AWSRetry to ec2_asg

* Paginate describing ASGs and launch configurations

pass connection to delete_asg

Fix a couple little bugs

* Use boto3's pagination build_full_result()
pull/28072/head
Sloane Hertel 7 years ago committed by Ryan Brown
parent e4cd899363
commit 40eb349ac6

@ -394,6 +394,106 @@ ASG_ATTRIBUTES = ('AvailabilityZones', 'DefaultCooldown', 'DesiredCapacity',
INSTANCE_ATTRIBUTES = ('instance_id', 'health_status', 'lifecycle_state', 'launch_config_name')
backoff_params = dict(tries=10, delay=3, backoff=1.5)
@AWSRetry.backoff(**backoff_params)
def describe_autoscaling_groups(connection, group_name):
pg = connection.get_paginator('describe_auto_scaling_groups')
return pg.paginate(AutoScalingGroupNames=[group_name]).build_full_result().get('AutoScalingGroups', [])
@AWSRetry.backoff(**backoff_params)
def deregister_lb_instances(connection, lb_name, instance_id):
connection.deregister_instances_from_load_balancer(LoadBalancerName=lb_name, Instances=[dict(InstanceId=instance_id)])
@AWSRetry.backoff(**backoff_params)
def describe_instance_health(connection, lb_name, instances):
params = dict(LoadBalancerName=lb_name)
if instances:
params.update(Instances=instances)
return connection.describe_instance_health(**params)
@AWSRetry.backoff(**backoff_params)
def describe_target_health(connection, target_group_arn, instances):
return connection.describe_target_health(TargetGroupArn=target_group_arn, Targets=instances)
@AWSRetry.backoff(**backoff_params)
def suspend_asg_processes(connection, asg_name, processes):
connection.suspend_processes(AutoScalingGroupName=asg_name, ScalingProcesses=processes)
@AWSRetry.backoff(**backoff_params)
def resume_asg_processes(connection, asg_name, processes):
connection.resume_processes(AutoScalingGroupName=asg_name, ScalingProcesses=processes)
@AWSRetry.backoff(**backoff_params)
def describe_launch_configurations(connection, launch_config_name):
pg = connection.get_paginator('describe_launch_configurations')
return pg.paginate(LaunchConfigurationNames=[launch_config_name]).build_full_result()
@AWSRetry.backoff(**backoff_params)
def create_asg(connection, **params):
connection.create_auto_scaling_group(**params)
@AWSRetry.backoff(**backoff_params)
def put_notification_config(connection, asg_name, topic_arn, notification_types):
connection.put_notification_configuration(
AutoScalingGroupName=asg_name,
TopicARN=topic_arn,
NotificationTypes=notification_types
)
@AWSRetry.backoff(**backoff_params)
def del_notification_config(connection, asg_name, topic_arn):
connection.delete_notification_configuration(
AutoScalingGroupName=asg_name,
TopicARN=topic_arn
)
@AWSRetry.backoff(**backoff_params)
def attach_load_balancers(connection, asg_name, load_balancers):
connection.attach_load_balancers(AutoScalingGroupName=asg_name, LoadBalancerNames=load_balancers)
@AWSRetry.backoff(**backoff_params)
def detach_load_balancers(connection, asg_name, load_balancers):
connection.detach_load_balancers(AutoScalingGroupName=asg_name, LoadBalancerNames=load_balancers)
@AWSRetry.backoff(**backoff_params)
def attach_lb_target_groups(connection, asg_name, target_group_arns):
connection.attach_load_balancer_target_groups(AutoScalingGroupName=asg_name, TargetGroupARNs=target_group_arns)
@AWSRetry.backoff(**backoff_params)
def detach_lb_target_groups(connection, asg_name, target_group_arns):
connection.detach_load_balancer_target_groups(AutoScalingGroupName=asg_name, TargetGroupARNs=target_group_arns)
@AWSRetry.backoff(**backoff_params)
def update_asg(connection, **params):
connection.update_auto_scaling_group(**params)
@AWSRetry.backoff(**backoff_params)
def delete_asg(connection, asg_name, force_delete):
connection.delete_auto_scaling_group(AutoScalingGroupName=asg_name, ForceDelete=force_delete)
@AWSRetry.backoff(**backoff_params)
def terminate_asg_instance(connection, instance_id, decrement_capacity):
connection.terminate_instance_in_auto_scaling_group(InstanceId=instance_id,
ShouldDecrementDesiredCapacity=decrement_capacity)
def enforce_required_arguments(module):
''' As many arguments are not required for autoscale group deletion
@ -471,7 +571,7 @@ def get_properties(autoscaling_group, module):
def elb_dreg(asg_connection, module, group_name, instance_id):
region, ec2_url, aws_connect_params = get_aws_connection_info(module, boto3=True)
as_group = asg_connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(asg_connection, group_name)[0]
wait_timeout = module.params.get('wait_timeout')
count = 1
if as_group['LoadBalancerNames'] and as_group['HealthCheckType'] == 'ELB':
@ -485,15 +585,14 @@ def elb_dreg(asg_connection, module, group_name, instance_id):
return
for lb in as_group['LoadBalancerNames']:
elb_connection.deregister_instances_from_load_balancer(LoadBalancerName=lb,
Instances=[dict(InstanceId=instance_id)])
deregister_lb_instances(elb_connection, lb, instance_id)
log.debug("De-registering {0} from ELB {1}".format(instance_id, lb))
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time() and count > 0:
count = 0
for lb in as_group['LoadBalancerNames']:
lb_instances = elb_connection.describe_instance_health(LoadBalancerName=lb)
lb_instances = describe_instance_health(elb_connection, lb, [])
for i in lb_instances['InstanceStates']:
if i['InstanceId'] == instance_id and i['State'] == "InService":
count += 1
@ -507,7 +606,7 @@ def elb_dreg(asg_connection, module, group_name, instance_id):
def elb_healthy(asg_connection, elb_connection, module, group_name):
healthy_instances = set()
as_group = asg_connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(asg_connection, group_name)[0]
props = get_properties(as_group, module)
# get healthy, inservice instances from ASG
instances = []
@ -521,7 +620,7 @@ def elb_healthy(asg_connection, elb_connection, module, group_name):
# we catch a race condition that sometimes happens if the instance exists in the ASG
# but has not yet show up in the ELB
try:
lb_instances = elb_connection.describe_instance_health(LoadBalancerName=lb, Instances=instances)
lb_instances = describe_instance_health(elb_connection, lb, instances)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidInstance':
return None
@ -541,7 +640,7 @@ def elb_healthy(asg_connection, elb_connection, module, group_name):
def tg_healthy(asg_connection, elbv2_connection, module, group_name):
healthy_instances = set()
as_group = asg_connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(asg_connection, group_name)[0]
props = get_properties(as_group, module)
# get healthy, inservice instances from ASG
instances = []
@ -555,7 +654,7 @@ def tg_healthy(asg_connection, elbv2_connection, module, group_name):
# we catch a race condition that sometimes happens if the instance exists in the ASG
# but has not yet show up in the ELB
try:
tg_instances = elbv2_connection.describe_target_health(TargetGroupArn=tg, Targets=instances)
tg_instances = describe_target_health(elbv2_connection, tg, instances)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidInstance':
return None
@ -579,7 +678,7 @@ def wait_for_elb(asg_connection, module, group_name):
# if the health_check_type is ELB, we want to query the ELBs directly for instance
# status as to avoid health_check_grace period that is awarded to ASG instances
as_group = asg_connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(asg_connection, group_name)[0]
if as_group.get('LoadBalancerNames') and as_group.get('HealthCheckType') == 'ELB':
log.debug("Waiting for ELB to consider instances healthy.")
@ -609,7 +708,7 @@ def wait_for_target_group(asg_connection, module, group_name):
# if the health_check_type is ELB, we want to query the ELBs directly for instance
# status as to avoid health_check_grace period that is awarded to ASG instances
as_group = asg_connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(asg_connection, group_name)[0]
if as_group.get('TargetGroupARNs') and as_group.get('HealthCheckType') == 'ELB':
log.debug("Waiting for Target Group to consider instances healthy.")
@ -647,10 +746,10 @@ def suspend_processes(ec2_connection, as_group, module):
resume_processes = list(suspended_processes - suspend_processes)
if resume_processes:
ec2_connection.resume_processes(AutoScalingGroupName=module.params.get('name'), ScalingProcesses=resume_processes)
resume_asg_processes(ec2_connection, module.params.get('name'), resume_processes)
if suspend_processes:
ec2_connection.suspend_processes(AutoScalingGroupName=module.params.get('name'), ScalingProcesses=list(suspend_processes))
suspend_asg_processes(ec2_connection, module.params.get('name'), list(suspend_processes))
return True
@ -703,7 +802,7 @@ def create_autoscaling_group(connection, module):
availability_zones = module.params['availability_zones'] = [zone['ZoneName'] for
zone in ec2_connection.describe_availability_zones()['AvailabilityZones']]
enforce_required_arguments(module)
launch_configs = connection.describe_launch_configurations(LaunchConfigurationNames=[launch_config_name])
launch_configs = describe_launch_configurations(connection, launch_config_name)
if len(launch_configs['LaunchConfigurations']) == 0:
module.fail_json(msg="No launch config found with name %s" % launch_config_name)
ag = dict(
@ -729,9 +828,9 @@ def create_autoscaling_group(connection, module):
ag['TargetGroupARNs'] = target_group_arns
try:
connection.create_auto_scaling_group(**ag)
create_asg(connection, **ag)
all_ag = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups']
all_ag = describe_autoscaling_groups(connection, group_name)
if len(all_ag) == 0:
module.fail_json(msg="No auto scaling group found with the name %s" % group_name)
as_group = all_ag[0]
@ -744,12 +843,8 @@ def create_autoscaling_group(connection, module):
if target_group_arns:
wait_for_target_group(connection, module, group_name)
if notification_topic:
connection.put_notification_configuration(
AutoScalingGroupName=group_name,
TopicARN=notification_topic,
NotificationTypes=notification_types
)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
put_notification_config(connection, group_name, notification_topic, notification_types)
as_group = describe_autoscaling_groups(connection, group_name)[0]
asg_properties = get_properties(as_group, module)
changed = True
return changed, asg_properties
@ -790,10 +885,7 @@ def create_autoscaling_group(connection, module):
if load_balancers and not as_group['LoadBalancerNames']:
changed = True
try:
connection.attach_load_balancers(
AutoScalingGroupName=group_name,
LoadBalancerNames=load_balancers
)
attach_load_balancers(connection, group_name, load_balancers)
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
module.fail_json(msg="Failed to update Autoscaling Group.",
exception=traceback.format_exc(), **camel_dict_to_snake_dict(e.response))
@ -813,29 +905,20 @@ def create_autoscaling_group(connection, module):
elbs_to_detach = has_elbs.difference(wanted_elbs)
if elbs_to_detach:
changed = True
connection.detach_load_balancers(
AutoScalingGroupName=group_name,
LoadBalancerNames=list(elbs_to_detach)
)
detach_load_balancers(connection, group_name, list(elbs_to_detach))
if wanted_elbs - has_elbs:
# if has contains less than wanted, then we need to add some
elbs_to_attach = wanted_elbs.difference(has_elbs)
if elbs_to_attach:
changed = True
connection.attach_load_balancers(
AutoScalingGroupName=group_name,
LoadBalancerNames=list(elbs_to_attach)
)
attach_load_balancers(connection, group_name, list(elbs_to_attach))
# Handle target group attachments/detachments
# Attach target groups if they are specified but none currently exist
if target_group_arns and not as_group['TargetGroupARNs']:
changed = True
try:
connection.attach_load_balancer_target_groups(
AutoScalingGroupName=group_name,
TargetGroupARNs=target_group_arns
)
attach_lb_target_groups(connection, group_name, target_group_arns)
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
module.fail_json(msg="Failed to update Autoscaling Group.",
exception=traceback.format_exc(), **camel_dict_to_snake_dict(e.response))
@ -850,19 +933,13 @@ def create_autoscaling_group(connection, module):
tgs_to_detach = has_tgs.difference(wanted_tgs)
if tgs_to_detach:
changed = True
connection.detach_load_balancer_target_groups(
AutoScalingGroupName=group_name,
TargetGroupARNs=list(tgs_to_detach)
)
detach_lb_target_groups(connection, group_name, tgs_to_detach)
if wanted_tgs.issuperset(has_tgs):
# if has contains less than wanted, then we need to add some
tgs_to_attach = wanted_tgs.difference(has_tgs)
if tgs_to_attach:
changed = True
connection.attach_load_balancer_target_groups(
AutoScalingGroupName=group_name,
TargetGroupARNs=list(tgs_to_attach)
)
attach_lb_target_groups(connection, group_name, tgs_to_attach)
# check for attributes that aren't required for updating an existing ASG
desired_capacity = desired_capacity or as_group['DesiredCapacity']
@ -870,7 +947,7 @@ def create_autoscaling_group(connection, module):
max_size = max_size or as_group['MaxSize']
launch_config_name = launch_config_name or as_group['LaunchConfigurationName']
launch_configs = connection.describe_launch_configurations(LaunchConfigurationNames=[launch_config_name])
launch_configs = describe_launch_configurations(connection, launch_config_name)
if len(launch_configs['LaunchConfigurations']) == 0:
module.fail_json(msg="No launch config found with name %s" % launch_config_name)
ag = dict(
@ -887,15 +964,11 @@ def create_autoscaling_group(connection, module):
ag['AvailabilityZones'] = availability_zones
if vpc_zone_identifier:
ag['VPCZoneIdentifier'] = vpc_zone_identifier
connection.update_auto_scaling_group(**ag)
update_asg(connection, **ag)
if notification_topic:
try:
connection.put_notification_configuration(
AutoScalingGroupName=group_name,
TopicARN=notification_topic,
NotificationTypes=notification_types
)
put_notification_config(connection, group_name, notification_topic, notification_types)
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
module.fail_json(msg="Failed to update Autoscaling Group notifications.",
exception=traceback.format_exc(), **camel_dict_to_snake_dict(e.response))
@ -912,8 +985,7 @@ def create_autoscaling_group(connection, module):
wait_for_target_group(connection, module, group_name)
try:
as_group = connection.describe_auto_scaling_groups(
AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
asg_properties = get_properties(as_group, module)
if asg_properties != initial_asg_properties:
changed = True
@ -930,26 +1002,19 @@ def delete_autoscaling_group(connection, module):
wait_timeout = module.params.get('wait_timeout')
if notification_topic:
connection.delete_notification_configuration(
AutoScalingGroupName=group_name,
TopicARN=notification_topic
)
describe_response = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])
groups = describe_response.get('AutoScalingGroups')
del_notification_config(connection, group_name, notification_topic)
groups = describe_autoscaling_groups(connection, group_name)
if groups:
if not wait_for_instances:
connection.delete_auto_scaling_group(AutoScalingGroupName=group_name, ForceDelete=True)
delete_asg(connection, group_name, force_delete=True)
return True
wait_timeout = time.time() + wait_timeout
connection.update_auto_scaling_group(
AutoScalingGroupName=group_name,
MinSize=0, MaxSize=0,
DesiredCapacity=0)
updated_params = dict(AutoScalingGroupName=group_name, MinSize=0, MaxSize=0, DesiredCapacity=0)
update_asg(connection, **updated_params)
instances = True
while instances and wait_for_instances and wait_timeout >= time.time():
tmp_groups = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name]).get(
'AutoScalingGroups')
tmp_groups = describe_autoscaling_groups(connection, group_name)
if tmp_groups:
tmp_group = tmp_groups[0]
if not tmp_group.get('Instances'):
@ -960,8 +1025,8 @@ def delete_autoscaling_group(connection, module):
# waiting took too long
module.fail_json(msg="Waited too long for old instances to terminate. %s" % time.asctime())
connection.delete_auto_scaling_group(AutoScalingGroupName=group_name)
while len(connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name]).get('AutoScalingGroups')):
delete_asg(connection, group_name, force_delete=False)
while describe_autoscaling_groups(connection, group_name):
time.sleep(5)
return True
@ -982,7 +1047,7 @@ def update_size(connection, group, max_size, min_size, dc):
updated_group['MinSize'] = min_size
updated_group['MaxSize'] = max_size
updated_group['DesiredCapacity'] = dc
connection.update_auto_scaling_group(**updated_group)
update_asg(connection, **updated_group)
def replace(connection, module):
@ -995,7 +1060,7 @@ def replace(connection, module):
lc_check = module.params.get('lc_check')
replace_instances = module.params.get('replace_instances')
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group['MinSize'], 'viable_instances')
props = get_properties(as_group, module)
instances = props['instances']
@ -1010,7 +1075,7 @@ def replace(connection, module):
if num_new_inst_needed == 0 and old_instances:
log.debug("No new instances needed, but old instances are present. Removing old instances")
terminate_batch(connection, module, old_instances, instances, True)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
changed = True
return(changed, props)
@ -1034,12 +1099,12 @@ def replace(connection, module):
# set temporary settings and wait for them to be reached
# This should get overwritten if the number of instances left is less than the batch size.
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
update_size(connection, as_group, max_size + batch_size, min_size + batch_size, desired_capacity + batch_size)
wait_for_new_inst(module, connection, group_name, wait_timeout, as_group['MinSize'], 'viable_instances')
wait_for_elb(connection, module, group_name)
wait_for_target_group(connection, module, group_name)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
instances = props['instances']
if replace_instances:
@ -1052,12 +1117,12 @@ def replace(connection, module):
wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, 'viable_instances')
wait_for_elb(connection, module, group_name)
wait_for_target_group(connection, module, group_name)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
if break_early:
log.debug("breaking loop")
break
update_size(connection, as_group, max_size, min_size, desired_capacity)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
asg_properties = get_properties(as_group, module)
log.debug("Rolling update complete.")
changed = True
@ -1115,7 +1180,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
decrement_capacity = False
break_loop = False
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
desired_size = as_group['MinSize']
@ -1134,8 +1199,8 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
if num_new_inst_needed == 0:
decrement_capacity = True
if as_group['MinSize'] != min_size:
connection.update_auto_scaling_group(AutoScalingGroupName=as_group['AutoScalingGroupName'],
MinSize=min_size)
updated_params = dict(AutoScalingGroupName=as_group['AutoScalingGroupName'], MinSize=min_size)
update_asg(connection, **updated_params)
log.debug("Updating minimum size back to original of {0}".format(min_size))
# if are some leftover old instances, but we are already at capacity with new ones
# we don't want to decrement capacity
@ -1157,8 +1222,7 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
for instance_id in instances_to_terminate:
elb_dreg(connection, module, group_name, instance_id)
log.debug("terminating instance: {0}".format(instance_id))
connection.terminate_instance_in_auto_scaling_group(InstanceId=instance_id,
ShouldDecrementDesiredCapacity=decrement_capacity)
terminate_asg_instance(connection, instance_id, decrement_capacity)
# we wait to make sure the machines we marked as Unhealthy are
# no longer in the list
@ -1169,14 +1233,14 @@ def terminate_batch(connection, module, replace_instances, initial_instances, le
def wait_for_term_inst(connection, module, term_instances):
wait_timeout = module.params.get('wait_timeout')
group_name = module.params.get('name')
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
count = 1
wait_timeout = time.time() + wait_timeout
while wait_timeout > time.time() and count > 0:
log.debug("waiting for instances to terminate")
count = 0
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
instance_facts = props['instance_facts']
instances = (i for i in instance_facts if i in term_instances)
@ -1196,7 +1260,7 @@ def wait_for_term_inst(connection, module, term_instances):
def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size, prop):
# make sure we have the latest stats after that last loop.
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
# now we make sure that we have enough instances in a viable state
@ -1204,7 +1268,7 @@ def wait_for_new_inst(module, connection, group_name, wait_timeout, desired_size
while wait_timeout > time.time() and desired_size > props[prop]:
log.debug("Waiting for {0} = {1}, currently {2}".format(prop, desired_size, props[prop]))
time.sleep(10)
as_group = connection.describe_auto_scaling_groups(AutoScalingGroupNames=[group_name])['AutoScalingGroups'][0]
as_group = describe_autoscaling_groups(connection, group_name)[0]
props = get_properties(as_group, module)
if wait_timeout <= time.time():
# waiting took too long

Loading…
Cancel
Save