diff --git a/docs/changelog.rst b/docs/changelog.rst index 8cc24424..14f86e77 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -29,6 +29,8 @@ Unreleased * :gh:issue:`905` Initial support for templated ``ansible_ssh_args``, ``ansible_ssh_common_args``, and ``ansible_ssh_extra_args`` variables. NB: play or task scoped variables will probably still fail. +* :gh:issue:`694` CI: Fixed a race condition and some resource leaks causing + some of intermittent failures when running the test suite. v0.3.9 (2024-08-13) diff --git a/mitogen/parent.py b/mitogen/parent.py index 4b96dcf4..2a43cad2 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -2542,7 +2542,7 @@ class Reaper(object): # because it is setuid, so this is best-effort only. LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum]) try: - os.kill(self.proc.pid, signum) + self.proc.send_signal(signum) except OSError: e = sys.exc_info()[1] if e.args[0] != errno.EPERM: @@ -2662,6 +2662,17 @@ class Process(object): """ raise NotImplementedError() + def send_signal(self, sig): + os.kill(self.pid, sig) + + def terminate(self): + "Ask the process to gracefully shutdown." + self.send_signal(signal.SIGTERM) + + def kill(self): + "Ask the operating system to forcefully destroy the process." + self.send_signal(signal.SIGKILL) + class PopenProcess(Process): """ @@ -2678,6 +2689,9 @@ class PopenProcess(Process): def poll(self): return self.proc.poll() + def send_signal(self, sig): + self.proc.send_signal(sig) + class ModuleForwarder(object): """ diff --git a/mitogen/unix.py b/mitogen/unix.py index 1af1c0ec..b241a403 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -143,19 +143,23 @@ class Listener(mitogen.core.Protocol): def on_accept_client(self, sock): sock.setblocking(True) try: - pid, = struct.unpack('>L', sock.recv(4)) + data = sock.recv(4) + pid, = struct.unpack('>L', data) except (struct.error, socket.error): - LOG.error('listener: failed to read remote identity: %s', - sys.exc_info()[1]) + LOG.error('listener: failed to read remote identity, got %d bytes: %s', + len(data), sys.exc_info()[1]) + sock.close() return context_id = self._router.id_allocator.allocate() try: + # FIXME #1109 send() returns number of bytes sent, check it sock.send(struct.pack('>LLL', context_id, mitogen.context_id, os.getpid())) except socket.error: LOG.error('listener: failed to assign identity to PID %d: %s', pid, sys.exc_info()[1]) + sock.close() return context = mitogen.parent.Context(self._router, context_id) diff --git a/tests/connection_test.py b/tests/connection_test.py index 5c3e678d..c3146954 100644 --- a/tests/connection_test.py +++ b/tests/connection_test.py @@ -1,3 +1,4 @@ +import logging import os import signal import sys @@ -54,7 +55,9 @@ def do_detach(econtext): class DetachReapTest(testlib.RouterMixin, testlib.TestCase): def test_subprocess_preserved_on_shutdown(self): c1 = self.router.local() + c1_stream = self.router.stream_by_id(c1.context_id) pid = c1.call(os.getpid) + self.assertEqual(pid, c1_stream.conn.proc.pid) l = mitogen.core.Latch() mitogen.core.listen(c1, 'disconnect', l.put) @@ -64,8 +67,8 @@ class DetachReapTest(testlib.RouterMixin, testlib.TestCase): self.broker.shutdown() self.broker.join() - os.kill(pid, 0) # succeeds if process still alive + self.assertIsNone(os.kill(pid, 0)) # succeeds if process still alive # now clean up - os.kill(pid, signal.SIGTERM) - os.waitpid(pid, 0) + c1_stream.conn.proc.terminate() + c1_stream.conn.proc.proc.wait() diff --git a/tests/create_child_test.py b/tests/create_child_test.py index acf3ea66..57b04b3f 100644 --- a/tests/create_child_test.py +++ b/tests/create_child_test.py @@ -76,6 +76,7 @@ def close_proc(proc): proc.stdout.close() if proc.stderr: proc.stderr.close() + proc.proc.wait() def wait_read(fp, n): diff --git a/tests/data/importer/six_brokenpkg/__init__.py b/tests/data/importer/six_brokenpkg/__init__.py index e5944b83..32356972 100644 --- a/tests/data/importer/six_brokenpkg/__init__.py +++ b/tests/data/importer/six_brokenpkg/__init__.py @@ -53,4 +53,4 @@ if _system_six: else: from . import _six as six six_py_file = '{0}.py'.format(os.path.splitext(six.__file__)[0]) -exec(open(six_py_file, 'rb').read()) +with open(six_py_file, 'rb') as f: exec(f.read()) diff --git a/tests/id_allocation_test.py b/tests/id_allocation_test.py index 850a68a5..91ff3d4b 100644 --- a/tests/id_allocation_test.py +++ b/tests/id_allocation_test.py @@ -27,3 +27,6 @@ class SlaveTest(testlib.RouterMixin, testlib.TestCase): # Subsequent master allocation does not collide c2 = self.router.local() self.assertEqual(1002, c2.context_id) + + context.shutdown() + c2.shutdown() diff --git a/tests/reaper_test.py b/tests/reaper_test.py index 8588a1bc..560d48ff 100644 --- a/tests/reaper_test.py +++ b/tests/reaper_test.py @@ -10,8 +10,7 @@ import mitogen.parent class ReaperTest(testlib.TestCase): - @mock.patch('os.kill') - def test_calc_delay(self, kill): + def test_calc_delay(self): broker = mock.Mock() proc = mock.Mock() proc.poll.return_value = None @@ -24,8 +23,7 @@ class ReaperTest(testlib.TestCase): self.assertEqual(752, int(1000 * reaper._calc_delay(5))) self.assertEqual(1294, int(1000 * reaper._calc_delay(6))) - @mock.patch('os.kill') - def test_reap_calls(self, kill): + def test_reap_calls(self): broker = mock.Mock() proc = mock.Mock() proc.poll.return_value = None @@ -33,20 +31,20 @@ class ReaperTest(testlib.TestCase): reaper = mitogen.parent.Reaper(broker, proc, True, True) reaper.reap() - self.assertEqual(0, kill.call_count) + self.assertEqual(0, proc.send_signal.call_count) reaper.reap() - self.assertEqual(1, kill.call_count) + self.assertEqual(1, proc.send_signal.call_count) reaper.reap() reaper.reap() reaper.reap() - self.assertEqual(1, kill.call_count) + self.assertEqual(1, proc.send_signal.call_count) reaper.reap() - self.assertEqual(2, kill.call_count) + self.assertEqual(2, proc.send_signal.call_count) - self.assertEqual(kill.mock_calls, [ - mock.call(proc.pid, signal.SIGTERM), - mock.call(proc.pid, signal.SIGKILL), + self.assertEqual(proc.send_signal.mock_calls, [ + mock.call(signal.SIGTERM), + mock.call(signal.SIGKILL), ]) diff --git a/tests/ssh_test.py b/tests/ssh_test.py index 3149fcbc..ce7dce96 100644 --- a/tests/ssh_test.py +++ b/tests/ssh_test.py @@ -190,6 +190,7 @@ class BannerTest(testlib.DockerMixin, testlib.TestCase): self.dockerized_ssh.port, ) self.assertEqual(name, context.name) + context.shutdown(wait=True) class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase): diff --git a/tests/testlib.py b/tests/testlib.py index 8c40e7ff..a52292ce 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -146,6 +146,17 @@ def data_path(suffix): return path +def retry(fn, on, max_attempts, delay): + for i in range(max_attempts): + try: + return fn() + except on: + if i >= max_attempts - 1: + raise + else: + time.sleep(delay) + + def threading__thread_is_alive(thread): """Return whether the thread is alive (Python version compatibility shim). @@ -562,18 +573,24 @@ class DockerizedSshDaemon(object): wait_for_port(self.get_host(), self.port, pattern='OpenSSH') def check_processes(self): - args = ['docker', 'exec', self.container_name, 'ps', '-o', 'comm='] + # Get Accounting name (ucomm) & command line (args) of each process + # in the container. No truncation (-ww). No column headers (foo=). + ps_output = subprocess.check_output([ + 'docker', 'exec', self.container_name, + 'ps', '-w', '-w', '-o', 'ucomm=', '-o', 'args=', + ]) + ps_lines = ps_output.decode().splitlines() + processes = [tuple(line.split(None, 1)) for line in ps_lines] counts = {} - for comm in subprocess.check_output(args).decode().splitlines(): - comm = comm.strip() - counts[comm] = counts.get(comm, 0) + 1 + for ucomm, _ in processes: + counts[ucomm] = counts.get(ucomm, 0) + 1 if counts != {'ps': 1, 'sshd': 1}: assert 0, ( 'Docker container %r contained extra running processes ' 'after test completed: %r' % ( self.container_name, - counts + processes, ) ) @@ -630,7 +647,12 @@ class DockerMixin(RouterMixin): @classmethod def tearDownClass(cls): - cls.dockerized_ssh.check_processes() + retry( + cls.dockerized_ssh.check_processes, + on=AssertionError, + max_attempts=5, + delay=0.1, + ) cls.dockerized_ssh.close() super(DockerMixin, cls).tearDownClass() diff --git a/tests/unix_test.py b/tests/unix_test.py index 14fc54ae..e251a7ad 100644 --- a/tests/unix_test.py +++ b/tests/unix_test.py @@ -65,17 +65,13 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase): def test_constructor_basic(self): listener = self.klass.build_stream(router=self.router) - capture = testlib.LogCapturer() - capture.start() - try: - self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path)) - os.unlink(listener.protocol.path) - # ensure we catch 0 byte read error log message - self.broker.shutdown() - self.broker.join() - self.broker_shutdown = True - finally: - capture.stop() + self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path)) + os.unlink(listener.protocol.path) + + # ensure we catch 0 byte read error log message + self.broker.shutdown() + self.broker.join() + self.broker_shutdown = True class ClientTest(testlib.TestCase):