diff --git a/lib/ansible/module_utils/facts/hardware/linux.py b/lib/ansible/module_utils/facts/hardware/linux.py index a0772eff2dc..293c75a2509 100644 --- a/lib/ansible/module_utils/facts/hardware/linux.py +++ b/lib/ansible/module_utils/facts/hardware/linux.py @@ -21,15 +21,12 @@ import glob import json import os import re -import signal import sys import time -from multiprocessing import cpu_count -from multiprocessing.pool import ThreadPool - -from ansible.module_utils.common.text.converters import to_text +from ansible.module_utils._internal._concurrent import _futures from ansible.module_utils.common.locale import get_best_parsable_locale +from ansible.module_utils.common.text.converters import to_text from ansible.module_utils.common.text.formatters import bytes_to_human from ansible.module_utils.facts.hardware.base import Hardware, HardwareCollector from ansible.module_utils.facts.utils import get_file_content, get_file_lines, get_mount_size @@ -39,10 +36,6 @@ from ansible.module_utils.six import iteritems from ansible.module_utils.facts import timeout -def _timeout_handler(signum, frame): - raise TimeoutError(f"Timeout reached in:{frame}") - - def get_partition_uuid(partname): try: uuids = os.listdir("/dev/disk/by-uuid") @@ -582,12 +575,7 @@ class LinuxHardware(Hardware): # start threads to query each mount results = {} - pool = None - try: - pool = ThreadPool(processes=min(len(mtab_entries), cpu_count())) - except (IOError, OSError) as e: - self.module.warn(f"Cannot use multiprocessing, falling back on serial execution: {e}") - + executor = _futures.DaemonThreadPoolExecutor() maxtime = timeout.GATHER_TIMEOUT or timeout.DEFAULT_GATHER_TIMEOUT for fields in mtab_entries: # Transform octal escape sequences @@ -611,67 +599,46 @@ class LinuxHardware(Hardware): if not self.MTAB_BIND_MOUNT_RE.match(options): mount_info['options'] += ",bind" - results[mount] = {'info': mount_info, 'timelimit': time.time() + maxtime} - if pool is None: - old_handler = signal.signal(signal.SIGALRM, _timeout_handler) - signal.alarm(maxtime) - try: - size, uuid = self.get_mount_info(mount, device, uuids) - except TimeoutError as e: - results[mount]['info']['note'] = 'Could not get extra information due to timeout' - self.module.log(f"Timeout while gathering mount {mount} data: {e}") - self.module.warn(f"Timeout exceeded when getting mount info for {mount}") - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, old_handler) - - if size: - results[mount]['info'].update(size) - results[mount]['info']['uuid'] = uuid or 'N/A' - else: - # use multiproc pool, handle results below - results[mount]['extra'] = pool.apply_async(self.get_mount_info, (mount, device, uuids)) + results[mount] = {'info': mount_info, 'timelimit': time.monotonic() + maxtime} + results[mount]['extra'] = executor.submit(self.get_mount_info, mount, device, uuids) - if pool is None: - # serial processing, just assing results - mounts.append(results[mount]['info']) - else: - pool.close() # done with spawing new workers, start gc + # done with spawning new workers, start gc + executor.shutdown() - while results: # wait for workers and get results - for mount in list(results): - done = False - res = results[mount]['extra'] - try: - if res.ready(): - done = True - if res.successful(): - mount_size, uuid = res.get() - if mount_size: - results[mount]['info'].update(mount_size) - results[mount]['info']['uuid'] = uuid or 'N/A' - else: - # failed, try to find out why, if 'res.successful' we know there are no exceptions - results[mount]['info']['note'] = 'Could not get extra information: %s.' % (to_text(res.get())) - - elif time.time() > results[mount]['timelimit']: - done = True - self.module.warn("Timeout exceeded when getting mount info for %s" % mount) - results[mount]['info']['note'] = 'Could not get extra information due to timeout' - except Exception as e: - import traceback + while results: # wait for workers and get results + for mount in list(results): + done = False + res = results[mount]['extra'] + try: + if res.done(): done = True - results[mount]['info'] = 'N/A' - self.module.warn("Error prevented getting extra info for mount %s: [%s] %s." % (mount, type(e), to_text(e))) - self.module.debug(traceback.format_exc()) - - if done: - # move results outside and make loop only handle pending - mounts.append(results[mount]['info']) - del results[mount] - - # avoid cpu churn, sleep between retrying for loop with remaining mounts - time.sleep(0.1) + if res.exception() is None: + mount_size, uuid = res.result() + if mount_size: + results[mount]['info'].update(mount_size) + results[mount]['info']['uuid'] = uuid or 'N/A' + else: + # failed, try to find out why, if 'res.successful' we know there are no exceptions + results[mount]['info']['note'] = f'Could not get extra information: {res.exception()}' + + elif time.monotonic() > results[mount]['timelimit']: + done = True + self.module.warn("Timeout exceeded when getting mount info for %s" % mount) + results[mount]['info']['note'] = 'Could not get extra information due to timeout' + except Exception as e: + import traceback + done = True + results[mount]['info'] = 'N/A' + self.module.warn("Error prevented getting extra info for mount %s: [%s] %s." % (mount, type(e), to_text(e))) + self.module.debug(traceback.format_exc()) + + if done: + # move results outside and make loop only handle pending + mounts.append(results[mount]['info']) + del results[mount] + + # avoid cpu churn, sleep between retrying for loop with remaining mounts + time.sleep(0.1) return {'mounts': mounts} diff --git a/test/integration/targets/gathering_facts/lib/multriprocessing/__init__.py b/test/integration/targets/gathering_facts/lib/multriprocessing/__init__.py deleted file mode 100644 index 9d48db4f9f8..00000000000 --- a/test/integration/targets/gathering_facts/lib/multriprocessing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from __future__ import annotations diff --git a/test/integration/targets/gathering_facts/lib/multriprocessing/pool/__init__.py b/test/integration/targets/gathering_facts/lib/multriprocessing/pool/__init__.py deleted file mode 100644 index 9c5a5d26a66..00000000000 --- a/test/integration/targets/gathering_facts/lib/multriprocessing/pool/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import annotations - - -class ThreadPool: - - def __init__(self, *args, **kwargs): - raise PermissionError("To test single proc ansible") diff --git a/test/integration/targets/gathering_facts/no_threads.yml b/test/integration/targets/gathering_facts/no_threads.yml deleted file mode 100644 index f8e21cd8078..00000000000 --- a/test/integration/targets/gathering_facts/no_threads.yml +++ /dev/null @@ -1,21 +0,0 @@ -- hosts: localhost - tasks: - - block: - - set_fact: - normal_devices: "{{ ansible_facts['devices'].keys() }}" - - - name: facts already gathered normally, but now we do mounts again w/o multithreading - gather_facts: - gather_subset: mounts - register: no_multi - environment: - PYTHONPATH: "${PWD}/lib" - - - set_fact: - single_devices: "{{no_multi['ansible_facts']['ansible_devices'].keys()}}" - - - assert: - that: - - normal_devices == single_devices - when: - - ansible_facts['os_family'] not in ['FreeBSD', 'Darwin'] diff --git a/test/integration/targets/gathering_facts/runme.sh b/test/integration/targets/gathering_facts/runme.sh index 39824a4a525..ace83aa2efa 100755 --- a/test/integration/targets/gathering_facts/runme.sh +++ b/test/integration/targets/gathering_facts/runme.sh @@ -39,7 +39,4 @@ ANSIBLE_FACTS_MODULES='ansible.legacy.slow' ansible -m gather_facts localhost -- # test parallelism ANSIBLE_FACTS_MODULES='dummy1,dummy2,dummy3' ansible -m gather_facts localhost --playbook-dir ./ -a 'gather_timeout=30 parallel=true' "$@" 2>&1 -# test lack of threads -ansible-playbook no_threads.yml "$@" 2>&1 - rm "${OUTPUT_DIR}/canary.txt"