@ -162,6 +162,7 @@ import re
import datetime
import time
from functools import reduce
from ansible . module_utils . _text import to_native
def convert_to_lower ( data ) :
@ -294,7 +295,7 @@ def get_tags(client, stream_name, check_mode=False):
]
success = True
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
return success , err_msg , results
@ -344,7 +345,7 @@ def find_stream(client, stream_name, check_mode=False):
}
success = True
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
return success , err_msg , results
@ -400,7 +401,7 @@ def wait_for_status(client, stream_name, status, wait_timeout=300,
else :
time . sleep ( polling_increment_secs )
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
if not status_achieved :
err_msg = " Wait time out reached, while waiting for results "
@ -459,7 +460,7 @@ def tags_action(client, stream_name, tags, action='create', check_mode=False):
err_msg = ' Invalid action {0} ' . format ( action )
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
return success , err_msg
@ -626,7 +627,7 @@ def stream_action(client, stream_name, shard_count=1, action='create',
err_msg = ' Invalid action {0} ' . format ( action )
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
return success , err_msg
@ -690,7 +691,7 @@ def retention_action(client, stream_name, retention_period=24,
err_msg = ' Invalid action {0} ' . format ( action )
except botocore . exceptions . ClientError as e :
err_msg = str ( e )
err_msg = to_native ( e )
return success , err_msg
@ -744,7 +745,7 @@ def update(client, current_stream, stream_name, retention_period=None,
if not wait_success :
return wait_success , False , wait_msg
if current_stream [ ' StreamStatus ' ] == ' ACTIVE ' :
if current_stream . get ( ' StreamStatus ' ) == ' ACTIVE ' :
retention_changed = False
if retention_period > current_stream [ ' RetentionPeriodHours ' ] :
retention_changed , retention_msg = (
@ -800,7 +801,7 @@ def update(client, current_stream, stream_name, retention_period=None,
else :
err_msg = (
' StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0} '
. format ( current_stream [ ' StreamStatus ' ] )
. format ( current_stream . get ( ' StreamStatus ' , ' UNKNOWN ' ) )
)
return success , changed , err_msg
@ -818,7 +819,7 @@ def update(client, current_stream, stream_name, retention_period=None,
if success and changed :
err_msg = ' Kinesis Stream {0} updated successfully. ' . format ( stream_name )
elif success and not changed :
err_msg = ' Kinesis Stream {0} did not change d .' . format ( stream_name )
err_msg = ' Kinesis Stream {0} did not change .' . format ( stream_name )
return success , changed , err_msg
@ -862,19 +863,20 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
stream_found , stream_msg , current_stream = (
find_stream ( client , stream_name , check_mode = check_mode )
)
if stream_found and not check_mode :
if current_stream [ ' ShardsCount ' ] != number_of_shards :
err_msg = ' Can not change the number of shards in a Kinesis Stream '
return success , changed , err_msg , results
if stream_found and current_stream [ ' StreamStatus ' ] == ' DELETING ' and wait :
if stream_found and current_stream . get ( ' StreamStatus ' ) == ' DELETING ' and wait :
wait_success , wait_msg , current_stream = (
wait_for_status (
client , stream_name , ' ACTIVE ' , wait_timeout ,
check_mode = check_mode
)
)
if stream_found and current_stream [ ' StreamStatus ' ] != ' DELETING ' :
if stream_found and not check_mode :
if current_stream [ ' ShardsCount ' ] != number_of_shards :
err_msg = ' Can not change the number of shards in a Kinesis Stream '
return success , changed , err_msg , results
if stream_found and current_stream . get ( ' StreamStatus ' ) != ' DELETING ' :
success , changed , err_msg = update (
client , current_stream , stream_name , retention_period , tags ,
wait , wait_timeout , check_mode = check_mode
@ -886,7 +888,11 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
check_mode = check_mode
)
)
if create_success :
if not create_success :
changed = True
err_msg = ' Failed to create Kinesis stream: {0} ' . format ( create_msg )
return False , True , err_msg , { }
else :
changed = True
if wait :
wait_success , wait_msg , results = (
@ -922,7 +928,7 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
stream_found , stream_msg , current_stream = (
find_stream ( client , stream_name , check_mode = check_mode )
)
if retention_period and current_stream [ ' StreamStatus ' ] == ' ACTIVE ' :
if retention_period and current_stream . get ( ' StreamStatus ' ) == ' ACTIVE ' :
changed , err_msg = (
retention_action (
client , stream_name , retention_period , action = ' increase ' ,
@ -936,7 +942,7 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
else :
err_msg = (
' StreamStatus has to be ACTIVE in order to modify the retention period. Current status is {0} '
. format ( current_stream [ ' StreamStatus ' ] )
. format ( current_stream . get ( ' StreamStatus ' , ' UNKNOWN ' ) )
)
success = create_success
changed = True
@ -1069,7 +1075,7 @@ def main():
)
)
except botocore . exceptions . ClientError as e :
err_msg = ' Boto3 Client Error - {0} ' . format ( str ( e . msg ) )
err_msg = ' Boto3 Client Error - {0} ' . format ( to_native ( e . msg ) )
module . fail_json (
success = False , changed = False , result = { } , msg = err_msg
)