|
|
|
@ -368,15 +368,21 @@ def wait_for_status(client, stream_name, status, wait_timeout=300,
|
|
|
|
|
find_success, find_msg, stream = (
|
|
|
|
|
find_stream(client, stream_name, check_mode=check_mode)
|
|
|
|
|
)
|
|
|
|
|
if status != 'DELETING':
|
|
|
|
|
if check_mode:
|
|
|
|
|
status_achieved = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
elif status != 'DELETING':
|
|
|
|
|
if find_success and stream:
|
|
|
|
|
if stream.get('StreamStatus') == status:
|
|
|
|
|
status_achieved = True
|
|
|
|
|
break
|
|
|
|
|
elif status == 'DELETING':
|
|
|
|
|
|
|
|
|
|
elif status == 'DELETING' and not check_mode:
|
|
|
|
|
if not find_success:
|
|
|
|
|
status_achieved = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
time.sleep(polling_increment_secs)
|
|
|
|
|
except botocore.exceptions.ClientError as e:
|
|
|
|
@ -494,7 +500,9 @@ def update_tags(client, stream_name, tags, check_mode=False):
|
|
|
|
|
"""
|
|
|
|
|
success = False
|
|
|
|
|
err_msg = ''
|
|
|
|
|
tag_success, tag_msg, current_tags = get_tags(client, stream_name)
|
|
|
|
|
tag_success, tag_msg, current_tags = (
|
|
|
|
|
get_tags(client, stream_name, check_mode=check_mode)
|
|
|
|
|
)
|
|
|
|
|
if current_tags:
|
|
|
|
|
tags = make_tags_in_aws_format(tags)
|
|
|
|
|
current_tags_set = (
|
|
|
|
@ -765,7 +773,9 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|
|
|
|
return success, changed, err_msg
|
|
|
|
|
|
|
|
|
|
if tags:
|
|
|
|
|
changed, err_msg = update_tags(client, stream_name, tags, check_mode)
|
|
|
|
|
changed, err_msg = (
|
|
|
|
|
update_tags(client, stream_name, tags, check_mode=check_mode)
|
|
|
|
|
)
|
|
|
|
|
if changed:
|
|
|
|
|
success = True
|
|
|
|
|
if wait:
|
|
|
|
@ -783,7 +793,7 @@ def update(client, current_stream, stream_name, retention_period=None,
|
|
|
|
|
return success, changed, err_msg
|
|
|
|
|
|
|
|
|
|
def create_stream(client, stream_name, number_of_shards=1, retention_period=None,
|
|
|
|
|
tags=None, wait=False, wait_timeout=300, check_mode=None):
|
|
|
|
|
tags=None, wait=False, wait_timeout=300, check_mode=False):
|
|
|
|
|
"""Create an Amazon Kinesis Stream.
|
|
|
|
|
Args:
|
|
|
|
|
client (botocore.client.EC2): Boto3 client.
|
|
|
|
@ -824,13 +834,14 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
|
|
|
|
if stream_found and current_stream['StreamStatus'] == 'DELETING' and wait:
|
|
|
|
|
wait_success, wait_msg, current_stream = (
|
|
|
|
|
wait_for_status(
|
|
|
|
|
client, stream_name, 'ACTIVE', wait_timeout, check_mode
|
|
|
|
|
client, stream_name, 'ACTIVE', wait_timeout,
|
|
|
|
|
check_mode=check_mode
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if stream_found and current_stream['StreamStatus'] != 'DELETING':
|
|
|
|
|
success, changed, err_msg = update(
|
|
|
|
|
client, current_stream, stream_name, retention_period, tags,
|
|
|
|
|
wait, wait_timeout, check_mode
|
|
|
|
|
wait, wait_timeout, check_mode=check_mode
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
create_success, create_msg = (
|
|
|
|
@ -844,7 +855,8 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None
|
|
|
|
|
if wait:
|
|
|
|
|
wait_success, wait_msg, results = (
|
|
|
|
|
wait_for_status(
|
|
|
|
|
client, stream_name, 'ACTIVE', wait_timeout, check_mode
|
|
|
|
|
client, stream_name, 'ACTIVE', wait_timeout,
|
|
|
|
|
check_mode=check_mode
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
err_msg = (
|
|
|
|
@ -938,7 +950,9 @@ def delete_stream(client, stream_name, wait=False, wait_timeout=300,
|
|
|
|
|
changed = False
|
|
|
|
|
err_msg = ''
|
|
|
|
|
results = dict()
|
|
|
|
|
stream_found, stream_msg, current_stream = find_stream(client, stream_name)
|
|
|
|
|
stream_found, stream_msg, current_stream = (
|
|
|
|
|
find_stream(client, stream_name, check_mode=check_mode)
|
|
|
|
|
)
|
|
|
|
|
if stream_found:
|
|
|
|
|
success, err_msg = (
|
|
|
|
|
stream_action(
|
|
|
|
@ -951,7 +965,7 @@ def delete_stream(client, stream_name, wait=False, wait_timeout=300,
|
|
|
|
|
success, err_msg, results = (
|
|
|
|
|
wait_for_status(
|
|
|
|
|
client, stream_name, 'DELETING', wait_timeout,
|
|
|
|
|
check_mode
|
|
|
|
|
check_mode=check_mode
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
err_msg = 'Stream {0} deleted successfully'.format(stream_name)
|
|
|
|
|