@ -28,14 +28,13 @@
from __future__ import absolute_import
from __future__ import absolute_import
import atexit
import atexit
import errno
import logging
import logging
import multiprocessing
import multiprocessing
import os
import os
import resource
import signal
import signal
import socket
import socket
import sys
import sys
import time
try :
try :
import faulthandler
import faulthandler
@ -79,6 +78,14 @@ worker_model_msg = (
' " mitogen_* " or " operon_* " strategies are active. '
' " mitogen_* " or " operon_* " strategies are active. '
)
)
shutting_down_msg = (
' The task worker cannot connect. Ansible may be shutting down, or '
' the maximum open files limit may have been exceeded. If this occurs '
' midway through a run, please retry after increasing the open file '
' limit (ulimit -n). Original error: %s '
)
#: The worker model as configured by the currently running strategy. This is
#: The worker model as configured by the currently running strategy. This is
#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by
#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by
#: :class:`StrategyMixin`.
#: :class:`StrategyMixin`.
@ -229,9 +236,27 @@ def _setup_responder(responder):
)
)
def increase_open_file_limit ( ) :
"""
#549: in order to reduce the possibility of hitting an open files limit,
increase : data : ` resource . RLIMIT_NOFILE ` from its soft limit to its hard
limit , if they differ .
It is common that a low soft limit is configured by default , where the hard
limit is much higher .
"""
soft , hard = resource . getrlimit ( resource . RLIMIT_NOFILE )
if soft < hard :
LOG . debug ( ' raising soft open file limit from %d to %d ' , soft , hard )
resource . setrlimit ( resource . RLIMIT_NOFILE , ( hard , hard ) )
else :
LOG . debug ( ' cannot increase open file limit; existing limit is %d ' , hard )
def common_setup ( enable_affinity = True , _init_logging = True ) :
def common_setup ( enable_affinity = True , _init_logging = True ) :
save_pid ( ' controller ' )
save_pid ( ' controller ' )
ansible_mitogen . logging . set_process_name ( ' top ' )
ansible_mitogen . logging . set_process_name ( ' top ' )
if enable_affinity :
if enable_affinity :
ansible_mitogen . affinity . policy . assign_controller ( )
ansible_mitogen . affinity . policy . assign_controller ( )
@ -247,6 +272,7 @@ def common_setup(enable_affinity=True, _init_logging=True):
mitogen . core . enable_profiling ( )
mitogen . core . enable_profiling ( )
MuxProcess . cls_original_env = dict ( os . environ )
MuxProcess . cls_original_env = dict ( os . environ )
increase_open_file_limit ( )
def get_cpu_count ( default = None ) :
def get_cpu_count ( default = None ) :
@ -269,10 +295,25 @@ def get_cpu_count(default=None):
class Binding ( object ) :
class Binding ( object ) :
"""
Represent a bound connection for a particular inventory hostname . When
operating in sharded mode , the actual MuxProcess implementing a connection
varies according to the target machine . Depending on the particular
implementation , this class represents a binding to the correct MuxProcess .
"""
def get_child_service_context ( self ) :
def get_child_service_context ( self ) :
"""
"""
Return the : class : ` mitogen . core . Context ` to which children should
Return the : class : ` mitogen . core . Context ` to which children should
direct ContextService requests , or : data : ` None ` for the local process .
direct requests for services such as FileService , or : data : ` None ` for
the local process .
This can be different from : meth : ` get_service_context ` where MuxProcess
and WorkerProcess are combined , and it is discovered a task is
delegated after being assigned to its initial worker for the original
un - delegated hostname . In that case , connection management and
expensive services like file transfer must be implemented by the
MuxProcess connected to the target , rather than routed to the
MuxProcess responsible for executing the task .
"""
"""
raise NotImplementedError ( )
raise NotImplementedError ( )
@ -358,8 +399,8 @@ class ClassicWorkerModel(WorkerModel):
def _listener_for_name ( self , name ) :
def _listener_for_name ( self , name ) :
"""
"""
Given a connection stack , return the UNIX listener that should be used
Given a n inventory hostname , return the UNIX listener that should
to communicate with it . This is a simple hash of the inventory name .
communicate with it . This is a simple hash of the inventory name .
"""
"""
if len ( self . _muxes ) == 1 :
if len ( self . _muxes ) == 1 :
return self . _muxes [ 0 ] . path
return self . _muxes [ 0 ] . path
@ -376,10 +417,16 @@ class ClassicWorkerModel(WorkerModel):
self . parent = None
self . parent = None
self . router = None
self . router = None
self . router , self . parent = mitogen . unix . connect (
try :
path = path ,
self . router , self . parent = mitogen . unix . connect (
broker = self . broker ,
path = path ,
)
broker = self . broker ,
)
except mitogen . unix . ConnectError as e :
# This is not AnsibleConnectionFailure since we want to break
# with_items loops.
raise ansible . errors . AnsibleError ( shutting_down_msg % ( e , ) )
self . listener_path = path
self . listener_path = path
def on_process_exit ( self , sock ) :
def on_process_exit ( self , sock ) :
@ -387,10 +434,9 @@ class ClassicWorkerModel(WorkerModel):
This is an : mod : ` atexit ` handler installed in the top - level process .
This is an : mod : ` atexit ` handler installed in the top - level process .
Shut the write end of ` sock ` , causing the receive side of the socket in
Shut the write end of ` sock ` , causing the receive side of the socket in
every worker process to wake up with a 0 - byte reads , and causing their
every worker process to return 0 - byte reads , and causing their main
main threads to wake up and initiate shutdown . After shutting the
threads to wake and initiate shutdown . After shutting the socket down ,
socket down , wait for a 0 - byte read from the read end , which will occur
wait on each child to finish exiting .
after the last child closes the descriptor on exit .
This is done using : mod : ` atexit ` since Ansible lacks any better hook to
This is done using : mod : ` atexit ` since Ansible lacks any better hook to
run code during exit , and unless some synchronization exists with
run code during exit , and unless some synchronization exists with
@ -407,14 +453,21 @@ class ClassicWorkerModel(WorkerModel):
mitogen . core . io_op ( sock . recv , 1 )
mitogen . core . io_op ( sock . recv , 1 )
sock . close ( )
sock . close ( )
for mux in self . _muxes :
_ , status = os . waitpid ( mux . pid , 0 )
status = mitogen . fork . _convert_exit_status ( status )
LOG . debug ( ' mux %d PID %d %s ' , mux . index , mux . pid ,
mitogen . parent . returncode_to_str ( status ) )
def _initialize ( self ) :
def _initialize ( self ) :
"""
"""
Arrange for classic process model connection multiplexer child
Arrange for classic model multiplexers to be started , if they are not
processes to be started , if they are not already running .
already running .
The parent process picks a UNIX socket path the child will use prior to
The parent process picks a UNIX socket path each child will use prior
fork , creates a socketpair used essentially as a semaphore , then blocks
to fork , creates a socketpair used essentially as a semaphore , then
waiting for the child to indicate the UNIX socket is ready for use .
blocks waiting for the child to indicate the UNIX socket is ready for
use .
: param bool _init_logging :
: param bool _init_logging :
For testing , if : data : ` False ` , don ' t initialize logging.
For testing , if : data : ` False ` , don ' t initialize logging.
@ -513,8 +566,8 @@ class MuxProcess(object):
Implement a subprocess forked from the Ansible top - level , as a safe place
Implement a subprocess forked from the Ansible top - level , as a safe place
to contain the Mitogen IO multiplexer thread , keeping its use of the
to contain the Mitogen IO multiplexer thread , keeping its use of the
logging package ( and the logging package ' s heavy use of locks) far away
logging package ( and the logging package ' s heavy use of locks) far away
from the clutches of os . fork( ) , which is used continuously by the
from os. fork( ) , which is used continuously by the multiprocessing package
multiprocessing package in the top - level process .
in the top - level process .
The problem with running the multiplexer in that process is that should the
The problem with running the multiplexer in that process is that should the
multiplexer thread be in the process of emitting a log entry ( and holding
multiplexer thread be in the process of emitting a log entry ( and holding
@ -555,7 +608,6 @@ class MuxProcess(object):
mitogen . core . io_op ( MuxProcess . cls_parent_sock . recv , 1 )
mitogen . core . io_op ( MuxProcess . cls_parent_sock . recv , 1 )
return
return
save_pid ( ' mux ' )
ansible_mitogen . logging . set_process_name ( ' mux: ' + str ( self . index ) )
ansible_mitogen . logging . set_process_name ( ' mux: ' + str ( self . index ) )
if setproctitle :
if setproctitle :
setproctitle . setproctitle ( ' mitogen mux: %s ( %s ) ' % (
setproctitle . setproctitle ( ' mitogen mux: %s ( %s ) ' % (
@ -581,7 +633,7 @@ class MuxProcess(object):
"""
"""
save_pid ( ' mux ' )
save_pid ( ' mux ' )
ansible_mitogen . logging . set_process_name ( ' mux ' )
ansible_mitogen . logging . set_process_name ( ' mux ' )
ansible_mitogen . affinity . policy . assign_muxprocess ( )
ansible_mitogen . affinity . policy . assign_muxprocess ( self . index )
self . _setup_master ( )
self . _setup_master ( )
self . _setup_services ( )
self . _setup_services ( )