@ -19,6 +19,7 @@ description:
- Create or Delete a Kinesis Stream .
- Update the retention period of a Kinesis Stream .
- Update Tags on a Kinesis Stream .
- Enable / disable server side encryption on a Kinesis Stream .
version_added : " 2.2 "
requirements : [ boto3 ]
author : Allen Sanabria ( @linuxdynasty )
@ -63,6 +64,24 @@ options:
required : false
default : null
aliases : [ " resource_tags " ]
encryption_state :
description :
- " Enable or Disable encryption on the Kinesis Stream. "
required : false
choices : [ ' enabled ' , ' disabled ' ]
version_added : " 2.5 "
encryption_type :
description :
- " The type of encryption. "
required : false
default : KMS
version_added : " 2.5 "
key_id :
description :
- " The GUID or alias for the KMS key. "
required : false
default : None
version_added : " 2.5 "
extends_documentation_fragment :
- aws
- ec2
@ -111,6 +130,30 @@ EXAMPLES = '''
wait : yes
wait_timeout : 600
register : test_stream
# Basic enable encryption example:
- name : Encrypt Kinesis Stream test - stream .
kinesis_stream :
name : test - stream
state : present
encryption_state : enabled
encryption_type : KMS
key_id : alias / aws / kinesis
wait : yes
wait_timeout : 600
register : test_stream
# Basic disable encryption example:
- name : Encrypt Kinesis Stream test - stream .
kinesis_stream :
name : test - stream
state : present
encryption_state : disabled
encryption_type : KMS
key_id : alias / aws / kinesis
wait : yes
wait_timeout : 600
register : test_stream
'''
RETURN = '''
@ -341,7 +384,8 @@ def find_stream(client, stream_name, check_mode=False):
' RetentionPeriodHours ' : 24 ,
' StreamName ' : stream_name ,
' StreamARN ' : ' arn:aws:kinesis:east-side:123456789:stream/ {0} ' . format ( stream_name ) ,
' StreamStatus ' : ' ACTIVE '
' StreamStatus ' : ' ACTIVE ' ,
' EncryptionType ' : ' NONE '
}
success = True
except botocore . exceptions . ClientError as e :
@ -632,6 +676,65 @@ def stream_action(client, stream_name, shard_count=1, action='create',
return success , err_msg
def stream_encryption_action ( client , stream_name , action = ' start_encryption ' , encryption_type = ' ' , key_id = ' ' ,
timeout = 300 , check_mode = False ) :
""" Create, Encrypt or Delete an Amazon Kinesis Stream.
Args :
client ( botocore . client . EC2 ) : Boto3 client .
stream_name ( str ) : The name of the kinesis stream .
Kwargs :
shard_count ( int ) : Number of shards this stream will use .
action ( str ) : The action to perform .
valid actions == create and delete
default = create
encryption_type ( str ) : NONE or KMS
key_id ( str ) : The GUID or alias for the KMS key
check_mode ( bool ) : This will pass DryRun as one of the parameters to the aws api .
default = False
Basic Usage :
>> > client = boto3 . client ( ' kinesis ' )
>> > stream_name = ' test-stream '
>> > shard_count = 20
>> > stream_action ( client , stream_name , shard_count , action = ' create ' , encryption_type = ' KMS ' , key_id = ' alias/aws ' )
Returns :
List ( bool , str )
"""
success = False
err_msg = ' '
params = {
' StreamName ' : stream_name
}
try :
if not check_mode :
if action == ' start_encryption ' :
params [ ' EncryptionType ' ] = encryption_type
params [ ' KeyId ' ] = key_id
client . start_stream_encryption ( * * params )
success = True
elif action == ' stop_encryption ' :
params [ ' EncryptionType ' ] = encryption_type
params [ ' KeyId ' ] = key_id
client . stop_stream_encryption ( * * params )
success = True
else :
err_msg = ' Invalid encryption action {0} ' . format ( action )
else :
if action == ' start_encryption ' :
success = True
elif action == ' stop_encryption ' :
success = True
else :
err_msg = ' Invalid encryption action {0} ' . format ( action )
except botocore . exceptions . ClientError as e :
err_msg = to_native ( e )
return success , err_msg
def retention_action ( client , stream_name , retention_period = 24 ,
action = ' increase ' , check_mode = False ) :
""" Increase or Decrease the retention of messages in the Kinesis stream.
@ -669,16 +772,14 @@ def retention_action(client, stream_name, retention_period=24,
client . increase_stream_retention_period ( * * params )
success = True
err_msg = (
' Retention Period increased successfully to {0} '
. format ( retention_period )
' Retention Period increased successfully to {0} ' . format ( retention_period )
)
elif action == ' decrease ' :
params [ ' RetentionPeriodHours ' ] = retention_period
client . decrease_stream_retention_period ( * * params )
success = True
err_msg = (
' Retention Period decreased successfully to {0} '
. format ( retention_period )
' Retention Period decreased successfully to {0} ' . format ( retention_period )
)
else :
err_msg = ' Invalid action {0} ' . format ( action )
@ -1099,6 +1200,144 @@ def delete_stream(client, stream_name, wait=False, wait_timeout=300,
return success , changed , err_msg , results
def start_stream_encryption ( client , stream_name , encryption_type = ' ' , key_id = ' ' ,
wait = False , wait_timeout = 300 , check_mode = False ) :
""" Start encryption on an Amazon Kinesis Stream.
Args :
client ( botocore . client . EC2 ) : Boto3 client .
stream_name ( str ) : The name of the kinesis stream .
Kwargs :
encryption_type ( str ) : KMS or NONE
key_id ( str ) : KMS key GUID or alias
wait ( bool ) : Wait until Stream is ACTIVE .
default = False
wait_timeout ( int ) : How long to wait until this operation is considered failed .
default = 300
check_mode ( bool ) : This will pass DryRun as one of the parameters to the aws api .
default = False
Basic Usage :
>> > client = boto3 . client ( ' kinesis ' )
>> > stream_name = ' test-stream '
>> > key_id = ' alias/aws '
>> > encryption_type = ' KMS '
>> > start_stream_encryption ( client , stream_name , encryption_type , key_id )
Returns :
Tuple ( bool , bool , str , dict )
"""
success = False
changed = False
err_msg = ' '
params = {
' StreamName ' : stream_name
}
results = dict ( )
stream_found , stream_msg , current_stream = (
find_stream ( client , stream_name , check_mode = check_mode )
)
if stream_found :
success , err_msg = (
stream_encryption_action (
client , stream_name , action = ' start_encryption ' , encryption_type = encryption_type , key_id = key_id , check_mode = check_mode
)
)
if success :
changed = True
if wait :
success , err_msg , results = (
wait_for_status (
client , stream_name , ' ACTIVE ' , wait_timeout ,
check_mode = check_mode
)
)
err_msg = ' Kinesis Stream {0} encryption started successfully. ' . format ( stream_name )
if not success :
return success , True , err_msg , results
else :
err_msg = (
' Kinesis Stream {0} is in the process of starting encryption. ' . format ( stream_name )
)
else :
success = True
changed = False
err_msg = ' Kinesis Stream {0} does not exist ' . format ( stream_name )
return success , changed , err_msg , results
def stop_stream_encryption ( client , stream_name , encryption_type = ' ' , key_id = ' ' ,
wait = True , wait_timeout = 300 , check_mode = False ) :
""" Stop encryption on an Amazon Kinesis Stream.
Args :
client ( botocore . client . EC2 ) : Boto3 client .
stream_name ( str ) : The name of the kinesis stream .
Kwargs :
encryption_type ( str ) : KMS or NONE
key_id ( str ) : KMS key GUID or alias
wait ( bool ) : Wait until Stream is ACTIVE .
default = False
wait_timeout ( int ) : How long to wait until this operation is considered failed .
default = 300
check_mode ( bool ) : This will pass DryRun as one of the parameters to the aws api .
default = False
Basic Usage :
>> > client = boto3 . client ( ' kinesis ' )
>> > stream_name = ' test-stream '
>> > start_stream_encryption ( client , stream_name , encryption_type , key_id )
Returns :
Tuple ( bool , bool , str , dict )
"""
success = False
changed = False
err_msg = ' '
params = {
' StreamName ' : stream_name
}
results = dict ( )
stream_found , stream_msg , current_stream = (
find_stream ( client , stream_name , check_mode = check_mode )
)
if stream_found :
if current_stream . get ( ' EncryptionType ' ) == ' KMS ' :
success , err_msg = (
stream_encryption_action (
client , stream_name , action = ' stop_encryption ' , key_id = key_id , encryption_type = encryption_type , check_mode = check_mode
)
)
elif current_stream . get ( ' EncryptionType ' ) == ' NONE ' :
success = True
if success :
changed = True
if wait :
success , err_msg , results = (
wait_for_status (
client , stream_name , ' ACTIVE ' , wait_timeout ,
check_mode = check_mode
)
)
err_msg = ' Kinesis Stream {0} encryption stopped successfully. ' . format ( stream_name )
if not success :
return success , True , err_msg , results
else :
err_msg = (
' Stream {0} is in the process of stopping encryption. ' . format ( stream_name )
)
else :
success = True
changed = False
err_msg = ' Stream {0} does not exist. ' . format ( stream_name )
return success , changed , err_msg , results
def main ( ) :
argument_spec = ec2_argument_spec ( )
argument_spec . update (
@ -1110,6 +1349,9 @@ def main():
wait = dict ( default = True , required = False , type = ' bool ' ) ,
wait_timeout = dict ( default = 300 , required = False , type = ' int ' ) ,
state = dict ( default = ' present ' , choices = [ ' present ' , ' absent ' ] ) ,
encryption_type = dict ( required = False , choices = [ ' NONE ' , ' KMS ' ] ) ,
key_id = dict ( required = False , type = ' str ' ) ,
encryption_state = dict ( required = False , choices = [ ' enabled ' , ' disabled ' ] ) ,
)
)
module = AnsibleModule (
@ -1124,6 +1366,9 @@ def main():
tags = module . params . get ( ' tags ' )
wait = module . params . get ( ' wait ' )
wait_timeout = module . params . get ( ' wait_timeout ' )
encryption_type = module . params . get ( ' encryption_type ' )
key_id = module . params . get ( ' key_id ' )
encryption_state = module . params . get ( ' encryption_state ' )
if state == ' present ' and not shards :
module . fail_json ( msg = ' Shards is required when state == present. ' )
@ -1159,6 +1404,18 @@ def main():
wait , wait_timeout , check_mode
)
)
if encryption_state == ' enabled ' :
success , changed , err_msg , results = (
start_stream_encryption (
client , stream_name , encryption_type , key_id , wait , wait_timeout , check_mode
)
)
elif encryption_state == ' disabled ' :
success , changed , err_msg , results = (
stop_stream_encryption (
client , stream_name , encryption_type , key_id , wait , wait_timeout , check_mode
)
)
elif state == ' absent ' :
success , changed , err_msg , results = (
delete_stream ( client , stream_name , wait , wait_timeout , check_mode )