@ -266,75 +266,67 @@ def _get_async_dir():
)
def _write_job_status ( job_id , dct ) :
"""
Update an async job status file .
"""
LOG . info ( ' _write_job_status( %r , %r ) ' , job_id , dct )
dct . setdefault ( ' ansible_job_id ' , job_id )
dct . setdefault ( ' data ' , ' ' )
async_dir = _get_async_dir ( )
if not os . path . exists ( async_dir ) :
os . makedirs ( async_dir )
path = os . path . join ( async_dir , job_id )
with open ( path + ' .tmp ' , ' w ' ) as fp :
fp . write ( json . dumps ( dct ) )
os . rename ( path + ' .tmp ' , path )
def _sigalrm ( broker , timeout_secs , job_id ) :
"""
Respond to SIGALRM ( job timeout ) by updating the job file and killing the
process .
"""
msg = " Job reached maximum time limit of %d seconds. " % ( timeout_secs , )
_write_job_status ( job_id , {
" failed " : 1 ,
" finished " : 1 ,
" msg " : msg ,
} )
broker . shutdown ( )
def _install_alarm ( broker , timeout_secs , job_id ) :
handler = lambda * _ : _sigalrm ( broker , timeout_secs , job_id )
signal . signal ( signal . SIGALRM , handler )
signal . alarm ( timeout_secs )
def _run_module_async ( kwargs , job_id , timeout_secs , econtext ) :
"""
1. Immediately updates the status file to mark the job as started .
2. Installs a timer / signal handler to implement the time limit .
3. Runs as with run_module ( ) , writing the result to the status file .
: param dict kwargs :
Runner keyword arguments .
: param str job_id :
String job ID .
: param int timeout_secs :
If > 0 , limit the task ' s maximum run time.
"""
_write_job_status ( job_id , {
' started ' : 1 ,
' finished ' : 0 ,
' pid ' : os . getpid ( )
} )
if timeout_secs > 0 :
_install_alarm ( econtext . broker , timeout_secs , job_id )
class AsyncRunner ( object ) :
def __init__ ( self , job_id , timeout_secs , econtext , kwargs ) :
self . job_id = job_id
self . timeout_secs = timeout_secs
self . econtext = econtext
self . kwargs = kwargs
self . _timed_out = False
self . _init_path ( )
def _init_path ( self ) :
async_dir = _get_async_dir ( )
if not os . path . exists ( async_dir ) :
os . makedirs ( async_dir )
self . path = os . path . join ( async_dir , self . job_id )
def _update ( self , dct ) :
"""
Update an async job status file .
"""
LOG . info ( ' %r ._update( %r , %r ) ' , self , self . job_id , dct )
dct . setdefault ( ' ansible_job_id ' , self . job_id )
dct . setdefault ( ' data ' , ' ' )
with open ( self . path + ' .tmp ' , ' w ' ) as fp :
fp . write ( json . dumps ( dct ) )
os . rename ( self . path + ' .tmp ' , self . path )
def _on_sigalrm ( self , signum , frame ) :
"""
Respond to SIGALRM ( job timeout ) by updating the job file and killing
the process .
"""
msg = " Job reached maximum time limit of %d seconds. " % (
self . timeout_secs ,
)
self . _update ( {
" failed " : 1 ,
" finished " : 1 ,
" msg " : msg ,
} )
self . _timed_out = True
self . econtext . broker . shutdown ( )
def _install_alarm ( self ) :
signal . signal ( signal . SIGALRM , self . _on_sigalrm )
signal . alarm ( self . timeout_secs )
def _run_module ( self ) :
kwargs = dict ( self . kwargs , * * {
' detach ' : True ,
' econtext ' : self . econtext ,
' emulate_tty ' : False ,
} )
kwargs [ ' detach ' ] = True
kwargs [ ' econtext ' ] = econtext
kwargs [ ' emulate_tty ' ] = False
dct = run_module ( kwargs )
if mitogen . core . PY3 :
for key in ' stdout ' , ' stderr ' :
dct [ key ] = dct [ key ] . decode ( ' utf-8 ' , ' surrogateescape ' )
dct = run_module ( kwargs )
if mitogen . core . PY3 :
for key in ' stdout ' , ' stderr ' :
dct [ key ] = dct [ key ] . decode ( ' utf-8 ' , ' surrogateescape ' )
return dct
try :
def _parse_result ( self , dct ) :
filtered , warnings = (
ansible . module_utils . json_utils .
_filter_non_json_lines ( dct [ ' stdout ' ] )
@ -342,34 +334,69 @@ def _run_module_async(kwargs, job_id, timeout_secs, econtext):
result = json . loads ( filtered )
result . setdefault ( ' warnings ' , [ ] ) . extend ( warnings )
result [ ' stderr ' ] = dct [ ' stderr ' ]
_write_job_status ( job_id , result )
except Exception :
_write_job_status ( job_id , {
" failed " : 1 ,
" msg " : traceback . format_exc ( ) ,
" data " : dct [ ' stdout ' ] , # temporary notice only
" stderr " : dct [ ' stderr ' ]
self . _update ( result )
def _run ( self ) :
"""
1. Immediately updates the status file to mark the job as started .
2. Installs a timer / signal handler to implement the time limit .
3. Runs as with run_module ( ) , writing the result to the status file .
: param dict kwargs :
Runner keyword arguments .
: param str job_id :
String job ID .
: param int timeout_secs :
If > 0 , limit the task ' s maximum run time.
"""
self . _update ( {
' started ' : 1 ,
' finished ' : 0 ,
' pid ' : os . getpid ( )
} )
if self . timeout_secs > 0 :
self . _install_alarm ( )
dct = self . _run_module ( )
if not self . _timed_out :
# After SIGALRM fires, there is a window between broker responding
# to shutdown() by killing the process, and work continuing on the
# main thread. If main thread was asleep in at least
# basic.py/select.select(), an EINTR will be raised. We want to
# discard that exception.
try :
self . _parse_result ( dct )
except Exception :
self . _update ( {
" failed " : 1 ,
" msg " : traceback . format_exc ( ) ,
" data " : dct [ ' stdout ' ] , # temporary notice only
" stderr " : dct [ ' stderr ' ]
} )
def run ( self ) :
try :
try :
self . _run ( )
except Exception :
self . _update ( {
" failed " : 1 ,
" msg " : traceback . format_exc ( ) ,
} )
finally :
self . econtext . broker . shutdown ( )
@mitogen.core.takes_econtext
def run_module_async ( kwargs , job_id , timeout_secs , econtext ) :
"""
Arrange for a module to be executed with its run status and result
serialized to a disk file . This function expects to run in a child forked
using : func : ` create_fork_child ` .
Execute a module with its run status and result written to a file ,
terminating on the process on completion . This function must run in a child
forked using : func : ` create_fork_child ` .
"""
try :
try :
_run_module_async ( kwargs , job_id , timeout_secs , econtext )
except Exception :
# Catch any (ansible_mitogen) bugs and write them to the job file.
_write_job_status ( job_id , {
" failed " : 1 ,
" msg " : traceback . format_exc ( ) ,
} )
finally :
econtext . broker . shutdown ( )
arunner = AsyncRunner ( job_id , timeout_secs , econtext , kwargs )
arunner . run ( )
def make_temp_directory ( base_dir ) :