Gather mount facts, fallback for when multiproc is not feasable (#83750)

* fallback to 'single threaded gathering' for when multiproc fails

Co-authored-by: Sviatoslav Sydorenko (Святослав Сидоренко) <wk.cvs.github@sydorenko.org.ua>
pull/83825/head
Brian Coca 3 months ago committed by GitHub
parent 520fa688ba
commit 90de03be50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,2 @@
bugfixes:
- setup module (fact gathering), added fallbcak code path to handle mount fact gathering in linux when threading is not available

@ -21,6 +21,7 @@ import glob
import json
import os
import re
import signal
import sys
import time
@ -38,6 +39,10 @@ from ansible.module_utils.six import iteritems
from ansible.module_utils.facts import timeout
def _timeout_handler(signum, frame):
raise TimeoutError(f"Timeout reached in:{frame}")
def get_partition_uuid(partname):
try:
uuids = os.listdir("/dev/disk/by-uuid")
@ -577,7 +582,12 @@ class LinuxHardware(Hardware):
# start threads to query each mount
results = {}
pool = ThreadPool(processes=min(len(mtab_entries), cpu_count()))
pool = None
try:
pool = ThreadPool(processes=min(len(mtab_entries), cpu_count()))
except (IOError, OSError) as e:
self.module.warn(f"Cannot use multiprocessing, falling back on serial execution: {e}")
maxtime = timeout.GATHER_TIMEOUT or timeout.DEFAULT_GATHER_TIMEOUT
for fields in mtab_entries:
# Transform octal escape sequences
@ -601,47 +611,67 @@ class LinuxHardware(Hardware):
if not self.MTAB_BIND_MOUNT_RE.match(options):
mount_info['options'] += ",bind"
results[mount] = {'info': mount_info,
'extra': pool.apply_async(self.get_mount_info, (mount, device, uuids)),
'timelimit': time.time() + maxtime}
results[mount] = {'info': mount_info, 'timelimit': time.time() + maxtime}
if pool is None:
old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(maxtime)
try:
size, uuid = self.get_mount_info(mount, device, uuids)
except TimeoutError as e:
results[mount]['info']['note'] = 'Could not get extra information due to timeout'
self.module.log(f"Timeout while gathering mount {mount} data: {e}")
self.module.warn(f"Timeout exceeded when getting mount info for {mount}")
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
if size:
results[mount]['info'].update(size)
results[mount]['info']['uuid'] = uuid or 'N/A'
else:
# use multiproc pool, handle results below
results[mount]['extra'] = pool.apply_async(self.get_mount_info, (mount, device, uuids))
pool.close() # done with new workers, start gc
if pool is None:
# serial processing, just assing results
mounts.append(results[mount]['info'])
else:
pool.close() # done with spawing new workers, start gc
# wait for workers and get results
while results:
for mount in list(results):
done = False
res = results[mount]['extra']
try:
if res.ready():
done = True
if res.successful():
mount_size, uuid = res.get()
if mount_size:
results[mount]['info'].update(mount_size)
results[mount]['info']['uuid'] = uuid or 'N/A'
else:
# failed, try to find out why, if 'res.successful' we know there are no exceptions
results[mount]['info']['note'] = 'Could not get extra information: %s.' % (to_text(res.get()))
elif time.time() > results[mount]['timelimit']:
while results: # wait for workers and get results
for mount in list(results):
done = False
res = results[mount]['extra']
try:
if res.ready():
done = True
if res.successful():
mount_size, uuid = res.get()
if mount_size:
results[mount]['info'].update(mount_size)
results[mount]['info']['uuid'] = uuid or 'N/A'
else:
# failed, try to find out why, if 'res.successful' we know there are no exceptions
results[mount]['info']['note'] = 'Could not get extra information: %s.' % (to_text(res.get()))
elif time.time() > results[mount]['timelimit']:
done = True
self.module.warn("Timeout exceeded when getting mount info for %s" % mount)
results[mount]['info']['note'] = 'Could not get extra information due to timeout'
except Exception as e:
import traceback
done = True
self.module.warn("Timeout exceeded when getting mount info for %s" % mount)
results[mount]['info']['note'] = 'Could not get extra information due to timeout'
except Exception as e:
import traceback
done = True
results[mount]['info'] = 'N/A'
self.module.warn("Error prevented getting extra info for mount %s: [%s] %s." % (mount, type(e), to_text(e)))
self.module.debug(traceback.format_exc())
if done:
# move results outside and make loop only handle pending
mounts.append(results[mount]['info'])
del results[mount]
# avoid cpu churn, sleep between retrying for loop with remaining mounts
time.sleep(0.1)
results[mount]['info'] = 'N/A'
self.module.warn("Error prevented getting extra info for mount %s: [%s] %s." % (mount, type(e), to_text(e)))
self.module.debug(traceback.format_exc())
if done:
# move results outside and make loop only handle pending
mounts.append(results[mount]['info'])
del results[mount]
# avoid cpu churn, sleep between retrying for loop with remaining mounts
time.sleep(0.1)
return {'mounts': mounts}

@ -0,0 +1,7 @@
from __future__ import annotations
class ThreadPool:
def __init__(self, *args, **kwargs):
raise PermissionError("To test single proc ansible")

@ -0,0 +1,21 @@
- hosts: localhost
tasks:
- block:
- set_fact:
normal_devices: "{{ ansible_facts['devices'].keys() }}"
- name: facts already gathered normally, but now we do mounts again w/o multithreading
gather_facts:
gather_subset: mounts
register: no_multi
environment:
PYTHONPATH: "${PWD}/lib"
- set_fact:
single_devices: "{{no_multi['ansible_facts']['ansible_devices'].keys()}}"
- assert:
that:
- normal_devices == single_devices
when:
- ansible_facts['os_family'] not in ['FreeBSD', 'Darwin']

@ -38,4 +38,8 @@ ANSIBLE_FACTS_MODULES='ansible.legacy.slow' ansible -m gather_facts localhost --
# test parallelism
ANSIBLE_FACTS_MODULES='dummy1,dummy2,dummy3' ansible -m gather_facts localhost --playbook-dir ./ -a 'gather_timeout=30 parallel=true' "$@" 2>&1
# test lack of threads
ansible-playbook no_threads.yml "$@" 2>&1
rm "${OUTPUT_DIR}/canary.txt"

Loading…
Cancel
Save