T woerner max concurrent (#60702)

* play, block, task: New attribute forks

With this it is possible to limit the number of concurrent task runs.
forks can now be used in play, block and task. If forks is set in different
levels in the chain, then the smallest value will be used for the task.

The attribute has been added to the Base class as a list to easily provide
all the values that have been set in the different levels of the chain.

A warning has been added because of the conflict with run_once. forks will
be ignored in this case.

The forks limitation in StrategyBase._queue_task is not used for the free
strategy.

Signed-off-by: Thomas Woerner <twoerner@redhat.com>

* Handle forks in free strategy

The forks attribute for the free strategy is handled in run in the free
StrategyModule. This is dony by counting the amount of tasks where the uuid
is the same as the current task, that should be queued next. If this amount
is bigger or equal to the forks attribute from the chain (task, block,
play), then it will be skipped to the next host. Like it is also done with
blocked_hosts.

Signed-off-by: Thomas Woerner <twoerner@redhat.com>

* Test cases for forks with linear and free strategy

With ansible_python_interpreter defined in inventory file using
ansible_playbook_python.

Signed-off-by: Thomas Woerner <twoerner@redhat.com>

* Changing forks keyword to throttle and adding some more docs
pull/61501/head
James Cammarata 5 years ago committed by GitHub
parent 7d1a981b61
commit 51b33b79c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
minor_changes:
- Added new `throttle` keyword, which can be used at the task, block, or play level to limit the number of workers (up to the specified forks or serial setting) allowed.

@ -68,6 +68,7 @@ serial: |
strategy: Allows you to choose the connection plugin to use for the play.
tags: Tags applied to the task or included tasks, this allows selecting subsets of tasks from the command line.
tasks: Main list of tasks to execute in the play, they run after :term:`roles` and before :term:`post_tasks`.
throttle: Limit number of concurrent task runs on task, block and playbook level. This is independent of the forks and serial settings, but cannot be set higher than those limits. For example, if forks is set to 10 and the throttle is set to 15, at most 10 hosts will be operated on in parallel.
until: "This keyword implies a ':term:`retries` loop' that will go on until the condition supplied here is met or we hit the :term:`retries` limit."
vars: Dictionary/map of variables
vars_files: List of files that contain vars to include in the play.

@ -38,6 +38,12 @@ Using keywords to control execution
-----------------------------------
Several play-level :ref:`keyword<playbook_keywords>` also affect play execution. The most common one is ``serial``, which sets a number, a percentage, or a list of numbers of hosts you want to manage at a time. Setting ``serial`` with any strategy directs Ansible to 'batch' the hosts, completing the play on the specified number or percentage of hosts before starting the next 'batch'. This is especially useful for :ref:`rolling updates<rolling_update_batch_size>`.
The second keyword to affect execution is ``throttle``, which can also be used at the block and task level. This keyword limits the number of workers up to the maximum set via the forks setting or ``serial``. This can be useful in restricting tasks that may be CPU-intensive or interact with a rate-limiting API::
tasks:
- command: /path/to/cpu_intensive_command
throttle: 1
Other keywords that affect play execution include ``ignore_errors``, ``ignore_unreachable``, and ``any_errors_fatal``. Please note that these keywords are not strategies. They are play-level directives or options.
.. seealso::

@ -613,6 +613,7 @@ class Base(FieldAttributeBase):
_check_mode = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('check'))
_diff = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('diff'))
_any_errors_fatal = FieldAttribute(isa='bool', default=C.ANY_ERRORS_FATAL)
_throttle = FieldAttribute(isa='int', default=0)
# explicitly invoke a debugger on tasks
_debugger = FieldAttribute(isa='string')

@ -155,6 +155,11 @@ class StrategyBase:
code useful to all strategies like running handlers, cleanup actions, etc.
'''
# by default, strategies should support throttling but we allow individual
# strategies to disable this and either forego supporting it or managing
# the throttling internally (as `free` does)
ALLOW_BASE_THROTTLING = True
def __init__(self, tqm):
self._tqm = tqm
self._inventory = tqm.get_inventory()
@ -310,6 +315,14 @@ class StrategyBase:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = Lock()
# create a templar and template things we need later for the queuing process
templar = Templar(loader=self._loader, variables=task_vars)
try:
throttle = int(templar.template(task.throttle))
except Exception as e:
raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
# and then queue the new task
try:
queued = False
@ -330,9 +343,25 @@ class StrategyBase:
worker_prc.start()
display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
queued = True
self._cur_worker += 1
if self._cur_worker >= len(self._workers):
# Determine the "rewind point" of the worker list. This means we start
# iterating over the list of workers until the end of the list is found.
# Normally, that is simply the length of the workers list (as determined
# by the forks or serial setting), however a task/block/play may "throttle"
# that limit down.
rewind_point = len(self._workers)
if throttle > 0 and self.ALLOW_BASE_THROTTLING:
if task.run_once:
display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
else:
if throttle <= rewind_point:
display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
rewind_point = throttle
if self._cur_worker >= rewind_point:
self._cur_worker = 0
if queued:
break
elif self._cur_worker == starting_worker:

@ -47,6 +47,9 @@ display = Display()
class StrategyModule(StrategyBase):
# This strategy manages throttling on its own, so we don't want it done in queue_task
ALLOW_BASE_THROTTLING = False
def _filter_notified_hosts(self, notified_hosts):
'''
Filter notified hosts accordingly to strategy
@ -118,7 +121,31 @@ class StrategyModule(StrategyBase):
display.debug("this host has work to do", host=host_name)
# check to see if this host is blocked (still executing a previous task)
if host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]:
if (host_name not in self._blocked_hosts or not self._blocked_hosts[host_name]):
display.debug("getting variables", host=host_name)
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables", host=host_name)
try:
throttle = int(templar.template(task.throttle))
except Exception as e:
raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e)
if throttle > 0:
same_tasks = 0
for worker in self._workers:
if worker and worker.is_alive() and worker._task._uuid == task._uuid:
same_tasks += 1
display.debug("task: %s, same_tasks: %d" % (task.get_name(), same_tasks))
if same_tasks >= throttle:
break
# pop the task, mark the host blocked, and queue it
self._blocked_hosts[host_name] = True
(state, task) = iterator.get_next_task_for_host(host)
@ -130,14 +157,6 @@ class StrategyModule(StrategyBase):
# corresponding action plugin
action = None
display.debug("getting variables", host=host_name)
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=task,
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all)
self.add_tqm_variables(task_vars, play=iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables", host=host_name)
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
display.debug("done templating", host=host_name)

@ -0,0 +1,6 @@
[localhosts]
testhost[00:11]
[localhosts:vars]
ansible_connection=local
ansible_python_interpreter="{{ ansible_playbook_python }}"

@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -eux
# https://github.com/ansible/ansible/pull/42528
ANSIBLE_STRATEGY='linear' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"
ANSIBLE_STRATEGY='free' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"

@ -0,0 +1,33 @@
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import time
# read the args from sys.argv
throttledir, inventory_hostname, max_throttle = sys.argv[1:]
# format/create additional vars
max_throttle = int(max_throttle)
throttledir = os.path.expanduser(throttledir)
throttlefile = os.path.join(throttledir, inventory_hostname)
try:
# create the file
with(open(throttlefile, 'a')):
os.utime(throttlefile, None)
# count the number of files in the dir
throttlelist = os.listdir(throttledir)
print("tasks: %d/%d" % (len(throttlelist), max_throttle))
# if we have too many files, fail
if len(throttlelist) > max_throttle:
print(throttlelist)
raise ValueError("Too many concurrent tasks: %d/%d" % (len(throttlelist), max_throttle))
finally:
# remove the file, then wait to make sure it's gone
os.unlink(throttlefile)
while True:
if not os.path.exists(throttlefile):
break
time.sleep(0.1)

@ -0,0 +1,59 @@
---
- hosts: localhosts
gather_facts: false
vars:
throttledir: ~/ansible_testing/throttle.dir/
tasks:
- name: Clean throttledir '{{ throttledir }}'
file:
state: absent
path: '{{ throttledir }}'
ignore_errors: yes
run_once: yes
- name: Create throttledir '{{ throttledir }}'
file:
state: directory
path: '{{ throttledir }}'
run_once: yes
- block:
- name: "Test 1 (max throttle: 3)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
throttle: 3
- block:
- name: "Test 2 (max throttle: 5)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
throttle: 5
- block:
- name: "Test 3 (max throttle: 8)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
throttle: 8
throttle: 6
- block:
- block:
- name: "Test 4 (max throttle: 8)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
throttle: 8
throttle: 6
throttle: 12
throttle: 15
- block:
- name: "Test 1 (max throttle: 3)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
throttle: 3
- block:
- name: "Test 2 (max throttle: 5)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
throttle: 5
- block:
- name: "Test 3 (max throttle: 6)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 6"
throttle: 6
throttle: 3
- block:
- block:
- name: "Test 4 (max throttle: 8)"
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
throttle: 8
throttle: 6
throttle: 4
throttle: 2

@ -194,7 +194,7 @@ class TestStrategyBase(unittest.TestCase):
variable_manager=mock_var_manager,
loader=fake_loader,
passwords=None,
forks=5,
forks=3,
)
tqm._initialize_processes(3)
tqm.hostvars = dict()

Loading…
Cancel
Save