remove deprecated datetime.datetime methods (#81323)

* Remove datetime.datetime.utcfromtimestamp and datetime.datetime.uctnow
  from controller code since they are deprecated in Python 3.12.

* Update target side code to use new utcfromtimestamp and utcnow utils in ansible.module_utils.compat.datetime that return aware datetime objects on Python 2.7 and 3.

Co-authored-by: Matt Clay <matt@mystile.com>
pull/81351/head
Sloane Hertel 10 months ago committed by GitHub
parent 923ede4f72
commit 4c41562270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,4 @@
bugfixes:
- On Python 3 use datetime methods ``fromtimestamp`` and ``now`` with UTC timezone instead of ``utcfromtimestamp`` and ``utcnow``, which are deprecated in Python 3.12.
minor_changes:
- Add ``utcfromtimestamp`` and ``utcnow`` to ``ansible.module_utils.compat.datetime`` to return fixed offset datetime objects.

@ -1241,7 +1241,7 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
else:
coverage = ''
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
output.write(to_bytes(ACTIVE_ANSIBALLZ_TEMPLATE % dict(
zipdata=zipdata,
ansible_module=module_name,

@ -359,7 +359,8 @@ class GalaxyAPI:
valid = False
if cache_key in server_cache:
expires = datetime.datetime.strptime(server_cache[cache_key]['expires'], iso_datetime_format)
valid = datetime.datetime.utcnow() < expires
expires = expires.replace(tzinfo=datetime.timezone.utc)
valid = datetime.datetime.now(datetime.timezone.utc) < expires
is_paginated_url = 'page' in query or 'offset' in query
if valid and not is_paginated_url:
@ -384,7 +385,7 @@ class GalaxyAPI:
elif not is_paginated_url:
# The cache entry had expired or does not exist, start a new blank entry to be filled later.
expires = datetime.datetime.utcnow()
expires = datetime.datetime.now(datetime.timezone.utc)
expires += datetime.timedelta(days=1)
server_cache[cache_key] = {
'expires': expires.strftime(iso_datetime_format),

@ -184,7 +184,7 @@ class GalaxyRole(object):
info = dict(
version=self.version,
install_date=datetime.datetime.utcnow().strftime("%c"),
install_date=datetime.datetime.now(datetime.timezone.utc).strftime("%c"),
)
if not os.path.exists(os.path.join(self.path, 'meta')):
os.makedirs(os.path.join(self.path, 'meta'))

@ -0,0 +1,40 @@
# Copyright (c) 2023 Ansible
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.module_utils.six import PY3
import datetime
if PY3:
UTC = datetime.timezone.utc
else:
_ZERO = datetime.timedelta(0)
class _UTC(datetime.tzinfo):
__slots__ = ()
def utcoffset(self, dt):
return _ZERO
def dst(self, dt):
return _ZERO
def tzname(self, dt):
return "UTC"
UTC = _UTC()
def utcfromtimestamp(timestamp): # type: (float) -> datetime.datetime
"""Construct an aware UTC datetime from a POSIX timestamp."""
return datetime.datetime.fromtimestamp(timestamp, UTC)
def utcnow(): # type: () -> datetime.datetime
"""Construct an aware UTC datetime from time.time()."""
return datetime.datetime.now(UTC)

@ -22,8 +22,8 @@ import datetime
import time
import ansible.module_utils.compat.typing as t
from ansible.module_utils.facts.collector import BaseFactCollector
from ansible.module_utils.compat.datetime import utcfromtimestamp
class DateTimeFactCollector(BaseFactCollector):
@ -37,7 +37,7 @@ class DateTimeFactCollector(BaseFactCollector):
# Store the timestamp once, then get local and UTC versions from that
epoch_ts = time.time()
now = datetime.datetime.fromtimestamp(epoch_ts)
utcnow = datetime.datetime.utcfromtimestamp(epoch_ts)
utcnow = utcfromtimestamp(epoch_ts).replace(tzinfo=None)
date_time_facts['year'] = now.strftime('%Y')
date_time_facts['month'] = now.strftime('%m')

@ -366,7 +366,6 @@ url:
sample: https://www.ansible.com/
'''
import datetime
import os
import re
import shutil
@ -375,6 +374,7 @@ import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.module_utils.compat.datetime import utcnow, utcfromtimestamp
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.urls import fetch_url, url_argument_spec
@ -397,10 +397,10 @@ def url_get(module, url, dest, use_proxy, last_mod_time, force, timeout=10, head
Return (tempfile, info about the request)
"""
start = datetime.datetime.utcnow()
start = utcnow()
rsp, info = fetch_url(module, url, use_proxy=use_proxy, force=force, last_mod_time=last_mod_time, timeout=timeout, headers=headers, method=method,
unredirected_headers=unredirected_headers, decompress=decompress, ciphers=ciphers, use_netrc=use_netrc)
elapsed = (datetime.datetime.utcnow() - start).seconds
elapsed = (utcnow() - start).seconds
if info['status'] == 304:
module.exit_json(url=url, dest=dest, changed=False, msg=info.get('msg', ''), status_code=info['status'], elapsed=elapsed)
@ -600,7 +600,7 @@ def main():
# If the file already exists, prepare the last modified time for the
# request.
mtime = os.path.getmtime(dest)
last_mod_time = datetime.datetime.utcfromtimestamp(mtime)
last_mod_time = utcfromtimestamp(mtime)
# If the checksum does not match we have to force the download
# because last_mod_time may be newer than on remote
@ -608,11 +608,11 @@ def main():
force = True
# download to tmpsrc
start = datetime.datetime.utcnow()
start = utcnow()
method = 'HEAD' if module.check_mode else 'GET'
tmpsrc, info = url_get(module, url, dest, use_proxy, last_mod_time, force, timeout, headers, tmp_dest, method,
unredirected_headers=unredirected_headers, decompress=decompress, ciphers=ciphers, use_netrc=use_netrc)
result['elapsed'] = (datetime.datetime.utcnow() - start).seconds
result['elapsed'] = (utcnow() - start).seconds
result['src'] = tmpsrc
# Now the request has completed, we can finally generate the final

@ -440,7 +440,6 @@ url:
sample: https://www.ansible.com/
'''
import datetime
import json
import os
import re
@ -452,6 +451,7 @@ from ansible.module_utils.basic import AnsibleModule, sanitize_keys
from ansible.module_utils.six import PY2, PY3, binary_type, iteritems, string_types
from ansible.module_utils.six.moves.urllib.parse import urlencode, urlsplit
from ansible.module_utils.common.text.converters import to_native, to_text
from ansible.module_utils.compat.datetime import utcnow, utcfromtimestamp
from ansible.module_utils.six.moves.collections_abc import Mapping, Sequence
from ansible.module_utils.urls import fetch_url, get_response_filename, parse_content_type, prepare_multipart, url_argument_spec
@ -580,7 +580,7 @@ def uri(module, url, dest, body, body_format, method, headers, socket_timeout, c
kwargs = {}
if dest is not None and os.path.isfile(dest):
# if destination file already exist, only download if file newer
kwargs['last_mod_time'] = datetime.datetime.utcfromtimestamp(os.path.getmtime(dest))
kwargs['last_mod_time'] = utcfromtimestamp(os.path.getmtime(dest))
resp, info = fetch_url(module, url, data=data, headers=headers,
method=method, timeout=socket_timeout, unix_socket=module.params['unix_socket'],
@ -686,12 +686,12 @@ def main():
module.exit_json(stdout="skipped, since '%s' does not exist" % removes, changed=False)
# Make the request
start = datetime.datetime.utcnow()
start = utcnow()
r, info = uri(module, url, dest, body, body_format, method,
dict_headers, socket_timeout, ca_path, unredirected_headers,
decompress, ciphers, use_netrc)
elapsed = (datetime.datetime.utcnow() - start).seconds
elapsed = (utcnow() - start).seconds
if r and dest is not None and os.path.isdir(dest):
filename = get_response_filename(r) or 'index.html'

@ -239,6 +239,7 @@ import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.sys_info import get_platform_subclass
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.compat.datetime import utcnow
HAS_PSUTIL = False
@ -532,7 +533,7 @@ def main():
except Exception:
module.fail_json(msg="unknown active_connection_state (%s) defined" % _connection_state, elapsed=0)
start = datetime.datetime.utcnow()
start = utcnow()
if delay:
time.sleep(delay)
@ -543,7 +544,7 @@ def main():
# first wait for the stop condition
end = start + datetime.timedelta(seconds=timeout)
while datetime.datetime.utcnow() < end:
while utcnow() < end:
if path:
try:
if not os.access(b_path, os.F_OK):
@ -560,7 +561,7 @@ def main():
# Conditions not yet met, wait and try again
time.sleep(module.params['sleep'])
else:
elapsed = datetime.datetime.utcnow() - start
elapsed = utcnow() - start
if port:
module.fail_json(msg=msg or "Timeout when waiting for %s:%s to stop." % (host, port), elapsed=elapsed.seconds)
elif path:
@ -569,14 +570,14 @@ def main():
elif state in ['started', 'present']:
# wait for start condition
end = start + datetime.timedelta(seconds=timeout)
while datetime.datetime.utcnow() < end:
while utcnow() < end:
if path:
try:
os.stat(b_path)
except OSError as e:
# If anything except file not present, throw an error
if e.errno != 2:
elapsed = datetime.datetime.utcnow() - start
elapsed = utcnow() - start
module.fail_json(msg=msg or "Failed to stat %s, %s" % (path, e.strerror), elapsed=elapsed.seconds)
# file doesn't exist yet, so continue
else:
@ -598,7 +599,7 @@ def main():
except IOError:
pass
elif port:
alt_connect_timeout = math.ceil(_timedelta_total_seconds(end - datetime.datetime.utcnow()))
alt_connect_timeout = math.ceil(_timedelta_total_seconds(end - utcnow()))
try:
s = socket.create_connection((host, port), min(connect_timeout, alt_connect_timeout))
except Exception:
@ -609,8 +610,8 @@ def main():
if b_compiled_search_re:
b_data = b''
matched = False
while datetime.datetime.utcnow() < end:
max_timeout = math.ceil(_timedelta_total_seconds(end - datetime.datetime.utcnow()))
while utcnow() < end:
max_timeout = math.ceil(_timedelta_total_seconds(end - utcnow()))
readable = select.select([s], [], [], max_timeout)[0]
if not readable:
# No new data. Probably means our timeout
@ -654,7 +655,7 @@ def main():
else: # while-else
# Timeout expired
elapsed = datetime.datetime.utcnow() - start
elapsed = utcnow() - start
if port:
if search_regex:
module.fail_json(msg=msg or "Timeout when waiting for search string %s in %s:%s" % (search_regex, host, port), elapsed=elapsed.seconds)
@ -670,17 +671,17 @@ def main():
# wait until all active connections are gone
end = start + datetime.timedelta(seconds=timeout)
tcpconns = TCPConnectionInfo(module)
while datetime.datetime.utcnow() < end:
while utcnow() < end:
if tcpconns.get_active_connections_count() == 0:
break
# Conditions not yet met, wait and try again
time.sleep(module.params['sleep'])
else:
elapsed = datetime.datetime.utcnow() - start
elapsed = utcnow() - start
module.fail_json(msg=msg or "Timeout when waiting for %s:%s to drain" % (host, port), elapsed=elapsed.seconds)
elapsed = datetime.datetime.utcnow() - start
elapsed = utcnow() - start
module.exit_json(state=state, port=port, search_regex=search_regex, match_groups=match_groups, match_groupdict=match_groupdict, path=path,
elapsed=elapsed.seconds)

@ -8,7 +8,7 @@ __metaclass__ = type
import random
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from ansible.errors import AnsibleError, AnsibleConnectionFailure
from ansible.module_utils.common.text.converters import to_native, to_text
@ -280,14 +280,14 @@ class ActionModule(ActionBase):
display.vvv("{action}: system successfully rebooted".format(action=self._task.action))
def do_until_success_or_timeout(self, action, reboot_timeout, action_desc, distribution, action_kwargs=None):
max_end_time = datetime.utcnow() + timedelta(seconds=reboot_timeout)
max_end_time = datetime.now(timezone.utc) + timedelta(seconds=reboot_timeout)
if action_kwargs is None:
action_kwargs = {}
fail_count = 0
max_fail_sleep = 12
while datetime.utcnow() < max_end_time:
while datetime.now(timezone.utc) < max_end_time:
try:
action(distribution=distribution, **action_kwargs)
if action_desc:
@ -336,7 +336,7 @@ class ActionModule(ActionBase):
display.debug('{action}: AnsibleConnectionFailure caught and handled: {error}'.format(action=self._task.action, error=to_text(e)))
reboot_result['rc'] = 0
result['start'] = datetime.utcnow()
result['start'] = datetime.now(timezone.utc)
if reboot_result['rc'] != 0:
result['failed'] = True
@ -447,7 +447,7 @@ class ActionModule(ActionBase):
if reboot_result['failed']:
result = reboot_result
elapsed = datetime.utcnow() - reboot_result['start']
elapsed = datetime.now(timezone.utc) - reboot_result['start']
result['elapsed'] = elapsed.seconds
return result
@ -459,7 +459,7 @@ class ActionModule(ActionBase):
# Make sure reboot was successful
result = self.validate_reboot(distribution, original_connection_timeout, action_kwargs={'previous_boot_time': previous_boot_time})
elapsed = datetime.utcnow() - reboot_result['start']
elapsed = datetime.now(timezone.utc) - reboot_result['start']
result['elapsed'] = elapsed.seconds
return result

@ -20,7 +20,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from ansible.module_utils.common.text.converters import to_text
from ansible.plugins.action import ActionBase
@ -43,10 +43,10 @@ class ActionModule(ActionBase):
DEFAULT_TIMEOUT = 600
def do_until_success_or_timeout(self, what, timeout, connect_timeout, what_desc, sleep=1):
max_end_time = datetime.utcnow() + timedelta(seconds=timeout)
max_end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout)
e = None
while datetime.utcnow() < max_end_time:
while datetime.now(timezone.utc) < max_end_time:
try:
what(connect_timeout)
if what_desc:

@ -829,7 +829,7 @@ class Templar:
def _now_datetime(self, utc=False, fmt=None):
'''jinja2 global function to return current datetime, potentially formatted via strftime'''
if utc:
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
else:
now = datetime.datetime.now()

@ -4,7 +4,7 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from datetime import datetime
from datetime import datetime, timezone
from ansible.module_utils.common.text.converters import to_native
from ansible.plugins.action import ActionBase
@ -64,7 +64,7 @@ class ActionModule(RebootActionModule, ActionBase):
result = {}
reboot_result = self._low_level_execute_command(reboot_command, sudoable=self.DEFAULT_SUDOABLE)
result['start'] = datetime.utcnow()
result['start'] = datetime.now(timezone.utc)
# Test for "A system shutdown has already been scheduled. (1190)" and handle it gracefully
stdout = reboot_result['stdout']

@ -10,28 +10,27 @@ import datetime
import string
import time
from ansible.module_utils.compat.datetime import UTC
from ansible.module_utils.facts.system import date_time
EPOCH_TS = 1594449296.123456
DT = datetime.datetime(2020, 7, 11, 12, 34, 56, 124356)
DT_UTC = datetime.datetime(2020, 7, 11, 2, 34, 56, 124356)
UTC_DT = datetime.datetime(2020, 7, 11, 2, 34, 56, 124356)
@pytest.fixture
def fake_now(monkeypatch):
"""
Patch `datetime.datetime.fromtimestamp()`, `datetime.datetime.utcfromtimestamp()`,
Patch `datetime.datetime.fromtimestamp()`,
and `time.time()` to return deterministic values.
"""
class FakeNow:
@classmethod
def fromtimestamp(cls, timestamp):
return DT
@classmethod
def utcfromtimestamp(cls, timestamp):
return DT_UTC
def fromtimestamp(cls, timestamp, tz=None):
if tz == UTC:
return UTC_DT.replace(tzinfo=tz)
return DT.replace(tzinfo=tz)
def _time():
return EPOCH_TS

Loading…
Cancel
Save