Revert "Revert "Add the fork manager code (related to previous patch).""

This reverts commit c53538dc77.
pull/5911/head
Michael DeHaan 11 years ago
parent 6685b4989e
commit 60d3611b70

@ -0,0 +1,134 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
import multiprocessing.pool as mpool
import multiprocessing
import sys
import os
POOL = None
OLD_SIZE = 0
class MyPool(mpool.Pool):
'''
What is Foon?
A Foon is another name for a Spork, which is a Fork plus a Spoon
this class is a wrapper around multiprocessing in Python
which deals with parallelism via Forks.
there is no Spoon.
Two things we do differently over stock multiprocessing:
* intercept exceptions
* duplicate stdin per host to enable the process to ask questions about host key checking
That's it. This class is specific to Ansible's runner forking and is not meant to be generic.
'''
def __init__(self, *args, **kwargs):
super(MyPool, self).__init__(*args, **kwargs)
# overriding map_async to catch exceptions and be extensible
def map_async(self, func, iterable, chunksize=None, callback=None):
'''
Asynchronous equivalent of `map()` builtin
'''
mapstar = mpool.mapstar
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
task_batches = MyPool._get_tasks(func, iterable, chunksize)
new_batches = []
stdins = []
for x in task_batches:
# make sure each batch has a different stdin
new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
(function, data_list) = x
new_data_list = []
for host_name in data_list:
new_data_list.append((host_name, new_stdin))
stdins.append(new_stdin)
new_batches.append((function, new_data_list))
result = mpool.MapResult(self._cache, chunksize, len(iterable), callback)
#for i, x in enumerate(task_batches):
# print "%s => %s" % (i,x)
self._taskqueue.put(
(
(
(result._job, i, mapstar, (x,), {}) for i, x in enumerate(new_batches) # task_batches)
), None)
)
for x in stdins:
x.close()
return result
class Foon(object):
def __init__(self):
self.set_size(0)
def make_pool(self, processes=None, initializer=None, initargs=()):
'''
Returns a process pool object
'''
return MyPool(processes, initializer, initargs)
def set_size(self, size):
global OLD_SIZE
global POOL
if size > OLD_SIZE or POOL is None:
OLD_SIZE = size
POOL = self.make_pool()
def map(self, function, data_list):
global POOL
try:
return POOL.map(function, data_list)
except KeyboardInterrupt:
print "KEYBOARD INTERRUPT!"
sys.exit(1)
Loading…
Cancel
Save