ansible: gracefully handle failure to connect to MuxProcess

It's possible to hit an ugly exception during early CTRL+C
pull/612/head
David Wilson 5 years ago
parent a9d3fdf6b7
commit 108015aa22

@ -46,7 +46,6 @@ import ansible.utils.shlex
import mitogen.core import mitogen.core
import mitogen.fork import mitogen.fork
import mitogen.unix
import mitogen.utils import mitogen.utils
import ansible_mitogen.parsing import ansible_mitogen.parsing

@ -79,6 +79,14 @@ worker_model_msg = (
'"mitogen_*" or "operon_*" strategies are active.' '"mitogen_*" or "operon_*" strategies are active.'
) )
shutting_down_msg = (
'The task worker cannot connect. Ansible may be shutting down, or '
'the maximum open files limit may have been exceeded. If this occurs '
'midway through a run, please retry after increasing the open file '
'limit (ulimit -n). Original error: %s'
)
#: The worker model as configured by the currently running strategy. This is #: The worker model as configured by the currently running strategy. This is
#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by #: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by
#: :class:`StrategyMixin`. #: :class:`StrategyMixin`.
@ -376,10 +384,16 @@ class ClassicWorkerModel(WorkerModel):
self.parent = None self.parent = None
self.router = None self.router = None
try:
self.router, self.parent = mitogen.unix.connect( self.router, self.parent = mitogen.unix.connect(
path=path, path=path,
broker=self.broker, broker=self.broker,
) )
except mitogen.unix.ConnectError as e:
# This is not AnsibleConnectionFailure since we want to break
# with_items loops.
raise ansible.errors.AnsibleError(shutting_down_msg % (e,))
self.listener_path = path self.listener_path = path
def on_process_exit(self, sock): def on_process_exit(self, sock):
@ -459,7 +473,7 @@ class ClassicWorkerModel(WorkerModel):
for mux in self._muxes: for mux in self._muxes:
pid, status = os.waitpid(mux.pid, 0) pid, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status) status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux PID %d %s', pid, LOG.error('mux PID %d %s', pid,
mitogen.parent.returncode_to_str(status)) mitogen.parent.returncode_to_str(status))
_classic_worker_model = None _classic_worker_model = None

@ -48,6 +48,22 @@ import mitogen.master
from mitogen.core import LOG from mitogen.core import LOG
class Error(mitogen.core.Error):
"""
Base for errors raised by :mod:`mitogen.unix`.
"""
pass
class ConnectError(Error):
"""
Raised when :func:`mitogen.unix.connect` fails to connect to the listening
socket.
"""
#: UNIX error number reported by underlying exception.
errno = None
def is_path_dead(path): def is_path_dead(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: try:
@ -154,7 +170,14 @@ class Listener(mitogen.core.Protocol):
def _connect(path, broker, sock): def _connect(path, broker, sock):
try:
sock.connect(path) sock.connect(path)
except socket.error:
e = sys.exc_info()[1]
ce = ConnectError('could not connect to %s: %s', path, e.args[1])
ce.errno = e.args[0]
raise ce
sock.send(struct.pack('>L', os.getpid())) sock.send(struct.pack('>L', os.getpid()))
mitogen.context_id, remote_id, pid = struct.unpack('>LLL', sock.recv(12)) mitogen.context_id, remote_id, pid = struct.unpack('>LLL', sock.recv(12))
mitogen.parent_id = remote_id mitogen.parent_id = remote_id

@ -23,6 +23,8 @@ import testlib
class MuxProcessMixin(object): class MuxProcessMixin(object):
no_zombie_check = True
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
#mitogen.utils.log_to_file() #mitogen.utils.log_to_file()
@ -61,7 +63,23 @@ class ConnectionMixin(MuxProcessMixin):
super(ConnectionMixin, self).tearDown() super(ConnectionMixin, self).tearDown()
class OptionalIntTest(unittest2.TestCase): class MuxShutdownTest(ConnectionMixin, testlib.TestCase):
def test_connection_failure_raised(self):
# ensure if a WorkerProcess tries to connect to a MuxProcess that has
# already shut down, it fails with a graceful error.
path = self.model._muxes[0].path
os.rename(path, path + '.tmp')
try:
#e = self.assertRaises(ansible.errors.AnsibleError,
#lambda: self.conn._connect()
#)
e = 1
print(e)
finally:
os.rename(path + '.tmp', path)
class OptionalIntTest(testlib.TestCase):
func = staticmethod(ansible_mitogen.connection.optional_int) func = staticmethod(ansible_mitogen.connection.optional_int)
def test_already_int(self): def test_already_int(self):
@ -81,7 +99,7 @@ class OptionalIntTest(unittest2.TestCase):
self.assertEquals(None, self.func({1:2})) self.assertEquals(None, self.func({1:2}))
class PutDataTest(ConnectionMixin, unittest2.TestCase): class PutDataTest(ConnectionMixin, testlib.TestCase):
def test_out_path(self): def test_out_path(self):
path = tempfile.mktemp(prefix='mitotest') path = tempfile.mktemp(prefix='mitotest')
contents = mitogen.core.b('contents') contents = mitogen.core.b('contents')
@ -102,7 +120,7 @@ class PutDataTest(ConnectionMixin, unittest2.TestCase):
os.unlink(path) os.unlink(path)
class PutFileTest(ConnectionMixin, unittest2.TestCase): class PutFileTest(ConnectionMixin, testlib.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
super(PutFileTest, cls).setUpClass() super(PutFileTest, cls).setUpClass()

@ -343,7 +343,14 @@ class TestCase(unittest2.TestCase):
self, self._fd_count_before, get_fd_count(), self, self._fd_count_before, get_fd_count(),
) )
# Some class fixtures (like Ansible MuxProcess) start persistent children
# for the duration of the class.
no_zombie_check = False
def _teardown_check_zombies(self): def _teardown_check_zombies(self):
if self.no_zombie_check:
return
try: try:
pid, status = os.waitpid(0, os.WNOHANG) pid, status = os.waitpid(0, os.WNOHANG)
except OSError: except OSError:
@ -354,7 +361,7 @@ class TestCase(unittest2.TestCase):
self, pid, status self, pid, status
) )
print() print('')
print('Children of unit test process:') print('Children of unit test process:')
os.system('ps uww --ppid ' + str(os.getpid())) os.system('ps uww --ppid ' + str(os.getpid()))
assert 0, "%s leaked still-running subprocesses." % (self,) assert 0, "%s leaked still-running subprocesses." % (self,)

Loading…
Cancel
Save