@ -23,13 +23,13 @@ author:
- Alan Loi ( @loia )
- Fernando Jose Pando ( @nand0p )
- Nadir Lloret ( @nadirollo )
- Dennis Podkovyrin ( @sbj - ss )
requirements :
- " boto >= 2.33.0 "
- boto3
options :
state :
description :
- Create or delete the queue .
required : false
choices : [ ' present ' , ' absent ' ]
default : ' present '
type : str
@ -38,9 +38,19 @@ options:
- Name of the queue .
required : true
type : str
default_visibility_timeout :
queue_type :
description :
- Standard or FIFO queue .
- I ( queue_type ) can only be set at queue creation and will otherwise be
ignored .
choices : [ ' standard ' , ' fifo ' ]
default : ' standard '
version_added : " 2.10 "
type : str
visibility_timeout :
description :
- The default visibility timeout in seconds .
aliases : [ default_visibility_timeout ]
type : int
message_retention_period :
description :
@ -50,13 +60,15 @@ options:
description :
- The maximum message size in bytes .
type : int
del ivery_delay :
del ay_seconds :
description :
- The delivery delay in seconds .
aliases : [ delivery_delay ]
type : int
receive_message_wait_time :
receive_message_wait_time _seconds :
description :
- The receive message wait time in seconds .
aliases : [ receive_message_wait_time ]
type : int
policy :
description :
@ -68,22 +80,65 @@ options:
- JSON dict with the redrive_policy ( see example ) .
version_added : " 2.2 "
type : dict
kms_master_key_id :
description :
- The ID of an AWS - managed customer master key ( CMK ) for Amazon SQS or a custom CMK .
version_added : " 2.10 "
type : str
kms_data_key_reuse_period_seconds :
description :
- The length of time , in seconds , for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again .
aliases : [ kms_data_key_reuse_period ]
version_added : " 2.10 "
type : int
content_based_deduplication :
type : bool
description : Enables content - based deduplication . Used for FIFOs only .
version_added : " 2.10 "
default : false
tags :
description :
- Tag dict to apply to the queue ( requires botocore 1.5 .40 or above ) .
- To remove all tags set I ( tags = { } ) and I ( purge_tags = true ) .
version_added : " 2.10 "
type : dict
purge_tags :
description :
- Remove tags not listed in I ( tags ) .
type : bool
default : false
version_added : " 2.10 "
extends_documentation_fragment :
- aws
- ec2
"""
RETURN = '''
default_visibility_timeout :
content_based_deduplication :
description : Enables content - based deduplication . Used for FIFOs only .
type : bool
returned : always
sample : True
visibility_timeout :
description : The default visibility timeout in seconds .
type : int
returned : always
sample : 30
delivery_delay :
del ay_seconds :
description : The delivery delay in seconds .
type : int
returned : always
sample : 0
kms_master_key_id :
description : The ID of an AWS - managed customer master key ( CMK ) for Amazon SQS or a custom CMK .
type : str
returned : always
sample : alias / MyAlias
kms_data_key_reuse_period_seconds :
description : The length of time , in seconds , for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again .
type : int
returned : always
sample : 300
maximum_message_size :
description : The maximum message size in bytes .
type : int
@ -102,9 +157,14 @@ name:
queue_arn :
description : The queue ' s Amazon resource name (ARN).
type : str
returned : on success ful creation or update of the queue
returned : on success
sample : ' arn:aws:sqs:us-east-1:199999999999:queuename-987d2de0 '
receive_message_wait_time :
queue_url :
description : URL to access the queue
type : str
returned : on success
sample : ' https://queue.amazonaws.com/123456789012/MyQueue '
receive_message_wait_time_seconds :
description : The receive message wait time in seconds .
type : int
returned : always
@ -114,6 +174,11 @@ region:
type : str
returned : always
sample : ' us-east-1 '
tags :
description : List of queue tags
type : dict
returned : always
sample : ' { " Env " : " prod " } '
'''
EXAMPLES = '''
@ -131,6 +196,33 @@ EXAMPLES = '''
maxReceiveCount : 5
deadLetterTargetArn : arn : aws : sqs : eu - west - 1 : 123456789012 : my - dead - queue
# Drop redrive policy
- sqs_queue :
name : my - queue
region : ap - southeast - 2
redrive_policy : { }
# Create FIFO queue
- sqs_queue :
name : fifo - queue
region : ap - southeast - 2
queue_type : fifo
content_based_deduplication : yes
# Tag queue
- sqs_queue :
name : fifo - queue
region : ap - southeast - 2
tags :
example : SomeValue
# Configure Encryption, automatically uses a new data key every hour
- sqs_queue :
name : fifo - queue
region : ap - southeast - 2
kms_master_key_id : alias / MyQueueKey
kms_data_key_reuse_period_seconds : 3600
# Delete SQS queue
- sqs_queue :
name : my - queue
@ -139,178 +231,250 @@ EXAMPLES = '''
'''
import json
import traceback
from ansible . module_utils . aws . core import AnsibleAWSModule
from ansible . module_utils . ec2 import AWSRetry , camel_dict_to_snake_dict , compare_aws_tags , snake_dict_to_camel_dict , compare_policies
try :
import boto . sqs
from boto . exception import BotoServerError , NoAuthHandlerFound
HAS_BOTO = True
from botocore . exceptions import BotoCoreError , ClientError , ParamValidationError
except ImportError :
HAS_BOTO = Fals e
pass # handled by AnsibleAWSModule
from ansible . module_utils . basic import AnsibleModule
from ansible . module_utils . ec2 import AnsibleAWSError , connect_to_aws , ec2_argument_spec , get_aws_connection_info
def get_queue_name ( module , is_fifo = False ) :
name = module . params . get ( ' name ' )
if not is_fifo or name . endswith ( ' .fifo ' ) :
return name
return name + ' .fifo '
def create_or_update_sqs_queue ( connection , module ) :
queue_name = module . params . get ( ' name ' )
queue_attributes = dict (
default_visibility_timeout = module . params . get ( ' default_visibility_timeout ' ) ,
message_retention_period = module . params . get ( ' message_retention_period ' ) ,
maximum_message_size = module . params . get ( ' maximum_message_size ' ) ,
delivery_delay = module . params . get ( ' delivery_delay ' ) ,
receive_message_wait_time = module . params . get ( ' receive_message_wait_time ' ) ,
policy = module . params . get ( ' policy ' ) ,
redrive_policy = module . params . get ( ' redrive_policy ' )
)
# NonExistentQueue is explicitly expected when a queue doesn't exist
@AWSRetry.jittered_backoff ( )
def get_queue_url ( client , name ) :
try :
return client . get_queue_url ( QueueName = name ) [ ' QueueUrl ' ]
except ClientError as e :
if e . response [ ' Error ' ] [ ' Code ' ] == ' AWS.SimpleQueueService.NonExistentQueue ' :
return None
raise
def describe_queue ( client , queue_url ) :
"""
Description a queue in snake format
"""
attributes = client . get_queue_attributes ( QueueUrl = queue_url , AttributeNames = [ ' All ' ] , aws_retry = True ) [ ' Attributes ' ]
description = dict ( attributes )
description . pop ( ' Policy ' , None )
description . pop ( ' RedrivePolicy ' , None )
description = camel_dict_to_snake_dict ( description )
description [ ' policy ' ] = attributes . get ( ' Policy ' , None )
description [ ' redrive_policy ' ] = attributes . get ( ' RedrivePolicy ' , None )
# Boto3 returns everything as a string, convert them back to integers/dicts if
# that's what we expected.
for key , value in description . items ( ) :
if value is None :
continue
if key in [ ' policy ' , ' redrive_policy ' ] :
policy = json . loads ( value )
description [ key ] = policy
continue
if key == ' content_based_deduplication ' :
try :
description [ key ] = bool ( value )
except ( TypeError , ValueError ) :
pass
try :
if value == str ( int ( value ) ) :
description [ key ] = int ( value )
except ( TypeError , ValueError ) :
pass
return description
def create_or_update_sqs_queue ( client , module ) :
is_fifo = ( module . params . get ( ' queue_type ' ) == ' fifo ' )
queue_name = get_queue_name ( module , is_fifo )
result = dict (
region = module . params . get ( ' region ' ) ,
name = queue_name ,
region = module . params . get ( ' region ' ) ,
changed = False ,
)
result . update ( queue_attributes )
try :
queue = connection . get_queue ( queue_name )
if queue :
# Update existing
result [ ' changed ' ] = update_sqs_queue ( queue , check_mode = module . check_mode , * * queue_attributes )
else :
# Create new
if not module . check_mode :
queue = connection . create_queue ( queue_name )
update_sqs_queue ( queue , * * queue_attributes )
queue_url = get_queue_url ( client , queue_name )
result [ ' queue_url ' ] = queue_url
if not queue_url :
create_attributes = { ' FifoQueue ' : ' true ' } if is_fifo else { }
result [ ' changed ' ] = True
if module . check_mode :
return result
queue_url = client . create_queue ( QueueName = queue_name , Attributes = create_attributes , aws_retry = True ) [ ' QueueUrl ' ]
if not module . check_mode :
result [ ' queue_arn ' ] = queue . get_attributes ( ' QueueArn ' ) [ ' QueueArn ' ]
result [ ' default_visibility_timeout ' ] = queue . get_attributes ( ' VisibilityTimeout ' ) [ ' VisibilityTimeout ' ]
result [ ' message_retention_period ' ] = queue . get_attributes ( ' MessageRetentionPeriod ' ) [ ' MessageRetentionPeriod ' ]
result [ ' maximum_message_size ' ] = queue . get_attributes ( ' MaximumMessageSize ' ) [ ' MaximumMessageSize ' ]
result [ ' delivery_delay ' ] = queue . get_attributes ( ' DelaySeconds ' ) [ ' DelaySeconds ' ]
result [ ' receive_message_wait_time ' ] = queue . get_attributes ( ' ReceiveMessageWaitTimeSeconds ' ) [ ' ReceiveMessageWaitTimeSeconds ' ]
except BotoServerError :
result [ ' msg ' ] = ' Failed to create/update sqs queue due to error: ' + traceback . format_exc ( )
module . fail_json ( * * result )
else :
module . exit_json ( * * result )
changed , arn = update_sqs_queue ( module , client , queue_url )
result [ ' changed ' ] | = changed
result [ ' queue_arn ' ] = arn
changed , tags = update_tags ( client , queue_url , module )
result [ ' changed ' ] | = changed
result [ ' tags ' ] = tags
def update_sqs_queue ( queue ,
check_mode = False ,
default_visibility_timeout = None ,
message_retention_period = None ,
maximum_message_size = None ,
delivery_delay = None ,
receive_message_wait_time = None ,
policy = None ,
redrive_policy = None ) :
changed = False
result . update ( describe_queue ( client , queue_url ) )
changed = set_queue_attribute ( queue , ' VisibilityTimeout ' , default_visibility_timeout ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' MessageRetentionPeriod ' , message_retention_period ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' MaximumMessageSize ' , maximum_message_size ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' DelaySeconds ' , delivery_delay ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' ReceiveMessageWaitTimeSeconds ' , receive_message_wait_time ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' Policy ' , policy ,
check_mode = check_mode ) or changed
changed = set_queue_attribute ( queue , ' RedrivePolicy ' , redrive_policy ,
check_mode = check_mode ) or changed
return changed
def set_queue_attribute ( queue , attribute , value , check_mode = False ) :
if not value and value != 0 :
return False
COMPATABILITY_KEYS = dict (
delay_seconds = ' delivery_delay ' ,
receive_message_wait_time_seconds = ' receive_message_wait_time ' ,
visibility_timeout = ' default_visibility_timeout ' ,
kms_data_key_reuse_period_seconds = ' kms_data_key_reuse_period ' ,
)
for key in list ( result . keys ( ) ) :
try :
existing_value = queue . get_attributes ( attributes = attribute ) [ attribute ]
except Exception :
existing_value = ' '
# The return values changed between boto and boto3, add the old keys too
# for backwards compatibility
return_name = COMPATABILITY_KEYS . get ( key )
if return_name :
result [ return_name ] = result . get ( key )
return result
# convert dict attributes to JSON strings (sort keys for comparing)
def update_sqs_queue ( module , client , queue_url ) :
check_mode = module . check_mode
changed = False
existing_attributes = client . get_queue_attributes ( QueueUrl = queue_url , AttributeNames = [ ' All ' ] , aws_retry = True ) [ ' Attributes ' ]
new_attributes = snake_dict_to_camel_dict ( module . params , capitalize_first = True )
attributes_to_set = dict ( )
# Boto3 SQS deals with policies as strings, we want to deal with them as
# dicts
if module . params . get ( ' policy ' ) is not None :
policy = module . params . get ( ' policy ' )
current_value = existing_attributes . get ( ' Policy ' , ' {} ' )
current_policy = json . loads ( current_value )
if compare_policies ( current_policy , policy ) :
attributes_to_set [ ' Policy ' ] = json . dumps ( policy )
changed = True
if module . params . get ( ' redrive_policy ' ) is not None :
policy = module . params . get ( ' redrive_policy ' )
current_value = existing_attributes . get ( ' RedrivePolicy ' , ' {} ' )
current_policy = json . loads ( current_value )
if compare_policies ( current_policy , policy ) :
attributes_to_set [ ' RedrivePolicy ' ] = json . dumps ( policy )
changed = True
for attribute , value in existing_attributes . items ( ) :
# We handle these as a special case because they're IAM policies
if attribute in [ ' Policy ' , ' RedrivePolicy ' ] :
value = json . dumps ( value , sort_keys = True )
if existing_value :
existing_value = json . dumps ( json . loads ( existing_value ) , sort_keys = True )
continue
if attribute not in new_attributes . keys ( ) :
continue
if new_attributes . get ( attribute ) is None :
continue
new_value = new_attributes [ attribute ]
if str ( value ) != existing_value :
if not check_mode :
queue . set_attribute ( attribute , value )
return True
if isinstance ( new_value , bool ) :
new_value = str ( new_value ) . lower ( )
existing_value = str ( existing_value ) . lower ( )
return False
if new_value == value :
continue
# Boto3 expects strings
attributes_to_set [ attribute ] = str ( new_value )
changed = True
def delete_sqs_queue ( connection , module ) :
queue_name = module . params . get ( ' name ' )
if changed and not check_mode :
client . set_queue_attributes ( QueueUrl = queue_url , Attributes = attributes_to_set , aws_retry = True )
return changed , existing_attributes . get ( ' queue_arn ' ) ,
def delete_sqs_queue ( client , module ) :
is_fifo = ( module . params . get ( ' queue_type ' ) == ' fifo ' )
queue_name = get_queue_name ( module , is_fifo )
result = dict (
region = module . params . get ( ' region ' ) ,
name = queue_name ,
region = module . params . get ( ' region ' ) ,
changed = False
)
try :
queue = connection . get_queue ( queue_name )
if queue :
queue_url = get_queue_url ( client , queue_name )
if not queue_url :
return result
result [ ' changed ' ] = bool ( queue_url )
if not module . check_mode :
connection . delete_queue ( queue )
result [ ' changed ' ] = True
AWSRetry . jittered_backoff ( ) ( client . delete_queue ) ( QueueUrl = queue_url )
else :
result [ ' changed ' ] = False
return result
except BotoServerError :
result [ ' msg ' ] = ' Failed to delete sqs queue due to error: ' + traceback . format_exc ( )
module . fail_json ( * * result )
else :
module . exit_json ( * * result )
def update_tags ( client , queue_url , module ) :
new_tags = module . params . get ( ' tags ' )
purge_tags = module . params . get ( ' purge_tags ' )
if new_tags is None :
return False , { }
def main ( ) :
argument_spec = ec2_argument_spec ( )
argument_spec . update ( dict (
state = dict ( default = ' present ' , choices = [ ' present ' , ' absent ' ] ) ,
name = dict ( required = True , type = ' str ' ) ,
default_visibility_timeout = dict ( type = ' int ' ) ,
message_retention_period = dict ( type = ' int ' ) ,
maximum_message_size = dict ( type = ' int ' ) ,
delivery_delay = dict ( type = ' int ' ) ,
receive_message_wait_time = dict ( type = ' int ' ) ,
policy = dict ( type = ' dict ' , required = False ) ,
redrive_policy = dict ( type = ' dict ' , required = False ) ,
) )
try :
existing_tags = client . list_queue_tags ( QueueUrl = queue_url , aws_retry = True ) [ ' Tags ' ]
except ( ClientError , KeyError ) as e :
existing_tags = { }
tags_to_add , tags_to_remove = compare_aws_tags ( existing_tags , new_tags , purge_tags = purge_tags )
module = AnsibleModule (
argument_spec = argument_spec ,
supports_check_mode = True )
if not module . check_mode :
if tags_to_remove :
client . untag_queue ( QueueUrl = queue_url , TagKeys = tags_to_remove , aws_retry = True )
if tags_to_add :
client . tag_queue ( QueueUrl = queue_url , Tags = tags_to_add )
existing_tags = client . list_queue_tags ( QueueUrl = queue_url , aws_retry = True ) . get ( ' Tags ' , { } )
else :
existing_tags = new_tags
if not HAS_BOTO :
module . fail_json ( msg = ' boto required for this module ' )
changed = bool ( tags_to_remove ) or bool ( tags_to_add )
return changed , existing_tags
region , ec2_url , aws_connect_params = get_aws_connection_info ( module )
if not region :
module . fail_json ( msg = ' region must be specified ' )
try :
connection = connect_to_aws ( boto . sqs , region , * * aws_connect_params )
def main ( ) :
except ( NoAuthHandlerFound , AnsibleAWSError ) as e :
module . fail_json ( msg = str ( e ) )
argument_spec = dict (
state = dict ( type = ' str ' , default = ' present ' , choices = [ ' present ' , ' absent ' ] ) ,
name = dict ( type = ' str ' , required = True ) ,
queue_type = dict ( type = ' str ' , default = ' standard ' , choices = [ ' standard ' , ' fifo ' ] ) ,
delay_seconds = dict ( type = ' int ' , aliases = [ ' delivery_delay ' ] ) ,
maximum_message_size = dict ( type = ' int ' ) ,
message_retention_period = dict ( type = ' int ' ) ,
policy = dict ( type = ' dict ' ) ,
receive_message_wait_time_seconds = dict ( type = ' int ' , aliases = [ ' receive_message_wait_time ' ] ) ,
redrive_policy = dict ( type = ' dict ' ) ,
visibility_timeout = dict ( type = ' int ' , aliases = [ ' default_visibility_timeout ' ] ) ,
kms_master_key_id = dict ( type = ' str ' ) ,
kms_data_key_reuse_period_seconds = dict ( type = ' int ' , aliases = [ ' kms_data_key_reuse_period ' ] ) ,
content_based_deduplication = dict ( type = ' bool ' ) ,
tags = dict ( type = ' dict ' ) ,
purge_tags = dict ( type = ' bool ' , default = False ) ,
)
module = AnsibleAWSModule ( argument_spec = argument_spec , supports_check_mode = True )
state = module . params . get ( ' state ' )
retry_decorator = AWSRetry . jittered_backoff ( catch_extra_error_codes = [ ' AWS.SimpleQueueService.NonExistentQueue ' ] )
try :
client = module . client ( ' sqs ' , retry_decorator = retry_decorator )
if state == ' present ' :
create_or_update_sqs_queue ( connection , module )
result = create_or_update_sqs_queue ( client , module )
elif state == ' absent ' :
delete_sqs_queue ( connection , module )
result = delete_sqs_queue ( client , module )
except ( BotoCoreError , ClientError , ParamValidationError ) as e :
module . fail_json_aws ( e , msg = ' Failed to control sqs queue ' )
else :
module . exit_json ( * * result )
if __name__ == ' __main__ ' :