From 395b03a77d79e5edfae2a44f0086ec01a1654d51 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 2 Aug 2019 22:30:58 +0100 Subject: [PATCH 1/6] issue #549: fix setrlimit() crash and hard-wire OS X default OS X advertised unlimited, but really it means kern.maxfilesperproc. --- ansible_mitogen/process.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index b09b54eb..f9c59e50 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -246,11 +246,22 @@ def increase_open_file_limit(): limit is much higher. """ soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - if soft < hard: - LOG.debug('raising soft open file limit from %d to %d', soft, hard) - resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) - else: - LOG.debug('cannot increase open file limit; existing limit is %d', hard) + LOG.debug('inherited open file limits: soft=%d hard=%d', soft, hard) + if soft >= hard: + LOG.debug('max open files already set to hard limit: %d', hard) + return + + # OS X is limited by kern.maxfilesperproc sysctl, rather than the + # advertised unlimited hard RLIMIT_NOFILE. Just hard-wire known defaults + # for that sysctl, to avoid the mess of querying it. + for value in (hard, 10240): + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (value, hard)) + LOG.debug('raised soft open file limit from %d to %d', soft, value) + break + except ValueError as e: + LOG.debug('could not raise soft open file limit from %d to %d: %s', + soft, value, e) def common_setup(enable_affinity=True, _init_logging=True): From dcfd733e6f8d36b0c21c6a59992dacae4b052c1c Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Aug 2019 05:12:33 +0100 Subject: [PATCH 2/6] issue #549: remove Linux-specific assumptions from create_child_test Some stat fields are implementation-specific, little value even testing them on Linux --- tests/create_child_test.py | 61 ++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/tests/create_child_test.py b/tests/create_child_test.py index 21591fe8..e7357059 100644 --- a/tests/create_child_test.py +++ b/tests/create_child_test.py @@ -15,7 +15,46 @@ from mitogen.core import b import testlib +def _osx_mode(n): + """ + fstat(2) on UNIX sockets on OSX return different mode bits depending on + which side is being inspected, so zero those bits for comparison. + """ + if sys.platform == 'darwin': + n &= ~int('0777', 8) + return n + + def run_fd_check(func, fd, mode, on_start=None): + """ + Run ``tests/data/fd_check.py`` using `func`. The subprocess writes + information about the `fd` it received to a temporary file. + + :param func: + Function like `create_child()` used to start child. + :param fd: + FD child should read/write from, and report information about. + :param mode: + "read" or "write", depending on whether the FD is readable or writeable + from the perspective of the child. If "read", `on_start()` should write + "TEST" to it and the child reads "TEST" from it, otherwise `on_start()` + should read "TEST" from it and the child writes "TEST" to it. + :param on_start: + Function invoked as `on_start(proc)` + :returns: + Tuple of `(proc, info, on_start_result)`, where: + + * `proc`: the :class:`mitogen.parent.Process` returned by `func`. + * `info`: dict containing information returned by the child: + * `buf`: "TEST" that was read in "read" mode + * `flags`: :attr:`fcntl.F_GETFL` flags for `fd` + * `st_mode`: st_mode field from :func:`os.fstat` + * `st_dev`: st_dev field from :func:`os.fstat` + * `st_ino`: st_ino field from :func:`os.fstat` + * `ttyname`: :func:`os.ttyname` invoked on `fd`. + * `controlling_tty`: :func:os.ttyname` invoked on ``/dev/tty`` + from within the child. + """ tf = tempfile.NamedTemporaryFile() args = [ sys.executable, @@ -61,7 +100,7 @@ class StdinSockMixin(object): st = os.fstat(proc.stdin.fileno()) self.assertTrue(stat.S_ISSOCK(st.st_mode)) self.assertEquals(st.st_dev, info['st_dev']) - self.assertEquals(st.st_mode, info['st_mode']) + self.assertEquals(st.st_mode, _osx_mode(info['st_mode'])) flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(info['buf'], 'TEST') @@ -75,7 +114,7 @@ class StdoutSockMixin(object): st = os.fstat(proc.stdout.fileno()) self.assertTrue(stat.S_ISSOCK(st.st_mode)) self.assertEquals(st.st_dev, info['st_dev']) - self.assertEquals(st.st_mode, info['st_mode']) + self.assertEquals(st.st_mode, _osx_mode(info['st_mode'])) flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(buf, 'TEST') @@ -105,8 +144,6 @@ class CreateChildMergedTest(StdinSockMixin, StdoutSockMixin, self.assertEquals(None, proc.stderr) st = os.fstat(proc.stdout.fileno()) self.assertTrue(stat.S_ISSOCK(st.st_mode)) - self.assertEquals(st.st_dev, info['st_dev']) - self.assertEquals(st.st_mode, info['st_mode']) flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(buf, 'TEST') @@ -145,13 +182,11 @@ class TtyCreateChildTest(testlib.TestCase): self.assertTrue(isinstance(info['ttyname'], mitogen.core.UnicodeType)) - os.ttyname(proc.stdin.fileno()) # crashes if not TTY + self.assertTrue(os.isatty(proc.stdin.fileno())) flags = fcntl.fcntl(proc.stdin.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(info['flags'] & os.O_RDWR) - - self.assertNotEquals(st.st_dev, info['st_dev']) self.assertTrue(info['buf'], 'TEST') def test_stdout(self): @@ -164,17 +199,18 @@ class TtyCreateChildTest(testlib.TestCase): self.assertTrue(isinstance(info['ttyname'], mitogen.core.UnicodeType)) - os.ttyname(proc.stdout.fileno()) # crashes if wrong + self.assertTrue(os.isatty(proc.stdout.fileno())) flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(info['flags'] & os.O_RDWR) - self.assertNotEquals(st.st_dev, info['st_dev']) self.assertTrue(flags & os.O_RDWR) self.assertTrue(buf, 'TEST') def test_stderr(self): + # proc.stderr is None in the parent since there is no separate stderr + # stream. In the child, FD 2/stderr is connected to the TTY. proc, info, buf = run_fd_check(self.func, 2, 'write', lambda proc: wait_read(proc.stdout, 4)) @@ -184,13 +220,12 @@ class TtyCreateChildTest(testlib.TestCase): self.assertTrue(isinstance(info['ttyname'], mitogen.core.UnicodeType)) - os.ttyname(proc.stdin.fileno()) # crashes if not TTY + self.assertTrue(os.isatty(proc.stdout.fileno())) flags = fcntl.fcntl(proc.stdout.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(info['flags'] & os.O_RDWR) - self.assertNotEquals(st.st_dev, info['st_dev']) self.assertTrue(flags & os.O_RDWR) self.assertTrue(buf, 'TEST') @@ -222,6 +257,7 @@ class TtyCreateChildTest(testlib.TestCase): class StderrDiagTtyMixin(object): def test_stderr(self): + # proc.stderr is the PTY master, FD 2 in the child is the PTY slave proc, info, buf = run_fd_check(self.func, 2, 'write', lambda proc: wait_read(proc.stderr, 4)) @@ -231,13 +267,12 @@ class StderrDiagTtyMixin(object): self.assertTrue(isinstance(info['ttyname'], mitogen.core.UnicodeType)) - os.ttyname(proc.stderr.fileno()) # crashes if wrong + self.assertTrue(os.isatty(proc.stderr.fileno())) flags = fcntl.fcntl(proc.stderr.fileno(), fcntl.F_GETFL) self.assertTrue(flags & os.O_RDWR) self.assertTrue(info['flags'] & os.O_RDWR) - self.assertNotEquals(st.st_dev, info['st_dev']) self.assertTrue(flags & os.O_RDWR) self.assertTrue(buf, 'TEST') From 19b259a45f204eb70edfdb2f69d7d43fbeb7596f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Aug 2019 05:48:47 +0100 Subject: [PATCH 3/6] issue #549: skip Docker tests if Docker is unavailable --- tests/testlib.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/testlib.py b/tests/testlib.py index 673d5ca6..d4387c54 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -427,6 +427,11 @@ class DockerizedSshDaemon(object): raise ValueError('could not find SSH port in: %r' % (s,)) def start_container(self): + try: + subprocess__check_output(['docker']) + except Exception: + raise unittest2.SkipTest('Docker binary is unavailable') + self.container_name = 'mitogen-test-%08x' % (random.getrandbits(64),) args = [ 'docker', From cebccf6f4123e38e6c6aabd4392be10dea02a8e5 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Aug 2019 07:13:12 +0100 Subject: [PATCH 4/6] issue #549 / [stream-refactor]: fix close/poller deregister crash on OSX See source comment. --- mitogen/core.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index be5d7e9c..14fddc0f 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1628,12 +1628,17 @@ class Protocol(object): self.stream.on_disconnect(broker) def on_disconnect(self, broker): + # Normally both sides an FD, so it is important that tranmit_side is + # deregistered from Poller before closing the receive side, as pollers + # like epoll and kqueue unregister all events on FD close, causing + # subsequent attempt to unregister the transmit side to fail. LOG.debug('%r: disconnecting', self) - if self.stream.receive_side: - broker.stop_receive(self.stream) - self.stream.receive_side.close() + broker.stop_receive(self.stream) if self.stream.transmit_side: broker._stop_transmit(self.stream) + + self.stream.receive_side.close() + if self.stream.transmit_side: self.stream.transmit_side.close() From 3ceac2c9eda4e3ea7f430ad692cdb8967e4dd633 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Aug 2019 18:17:24 +0100 Subject: [PATCH 5/6] [linear2] simplify ClassicWorkerModel and fix repeat initialization "self.initialized = False" slipped in a few days ago, on second thoughts that flag is not needed at all, by simply rearranging ClassicWorkerModel to have a regular constructor. This hierarchy is still squishy, it needs more love. Remaining MuxProcess class attributes should eliminated. --- ansible_mitogen/process.py | 114 ++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 60 deletions(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index f9c59e50..b8f10ec2 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -399,6 +399,18 @@ class ClassicBinding(Binding): class ClassicWorkerModel(WorkerModel): + #: In the top-level process, this references one end of a socketpair(), + #: whose other end child MuxProcesses block reading from to determine when + #: the master process dies. When the top-level exits abnormally, or + #: normally but where :func:`_on_process_exit` has been called, this socket + #: will be closed, causing all the children to wake. + parent_sock = None + + #: In the mux process, this is the other end of :attr:`cls_parent_sock`. + #: The main thread blocks on a read from it until :attr:`cls_parent_sock` + #: is closed. + child_sock = None + #: mitogen.master.Router for this worker. router = None @@ -414,8 +426,34 @@ class ClassicWorkerModel(WorkerModel): parent = None def __init__(self, _init_logging=True): - self._init_logging = _init_logging - self.initialized = False + """ + Arrange for classic model multiplexers to be started, if they are not + already running. + + The parent process picks a UNIX socket path each child will use prior + to fork, creates a socketpair used essentially as a semaphore, then + blocks waiting for the child to indicate the UNIX socket is ready for + use. + + :param bool _init_logging: + For testing, if :data:`False`, don't initialize logging. + """ + common_setup(_init_logging=_init_logging) + + self.parent_sock, self.child_sock = socket.socketpair() + mitogen.core.set_cloexec(self.parent_sock.fileno()) + mitogen.core.set_cloexec(self.child_sock.fileno()) + + self._muxes = [ + MuxProcess(self, index) + for index in range(get_cpu_count(default=1)) + ] + for mux in self._muxes: + mux.start() + + atexit.register(self._on_process_exit, self.parent_sock) + self.child_sock.close() + self.child_sock = None def _listener_for_name(self, name): """ @@ -449,7 +487,7 @@ class ClassicWorkerModel(WorkerModel): self.listener_path = path - def on_process_exit(self, sock): + def _on_process_exit(self, sock): """ This is an :mod:`atexit` handler installed in the top-level process. @@ -467,7 +505,7 @@ class ClassicWorkerModel(WorkerModel): sock.shutdown(socket.SHUT_WR) except socket.error: # Already closed. This is possible when tests are running. - LOG.debug('on_process_exit: ignoring duplicate call') + LOG.debug('_on_process_exit: ignoring duplicate call') return mitogen.core.io_op(sock.recv, 1) @@ -479,46 +517,15 @@ class ClassicWorkerModel(WorkerModel): LOG.debug('mux %d PID %d %s', mux.index, mux.pid, mitogen.parent.returncode_to_str(status)) - def _initialize(self): - """ - Arrange for classic model multiplexers to be started, if they are not - already running. - - The parent process picks a UNIX socket path each child will use prior - to fork, creates a socketpair used essentially as a semaphore, then - blocks waiting for the child to indicate the UNIX socket is ready for - use. - - :param bool _init_logging: - For testing, if :data:`False`, don't initialize logging. - """ - common_setup(_init_logging=self._init_logging) - - MuxProcess.cls_parent_sock, \ - MuxProcess.cls_child_sock = socket.socketpair() - mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno()) - mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno()) - - self._muxes = [ - MuxProcess(index) - for index in range(get_cpu_count(default=1)) - ] - for mux in self._muxes: - mux.start() - - atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock) - MuxProcess.cls_child_sock.close() - MuxProcess.cls_child_sock = None - def _test_reset(self): """ Used to clean up in unit tests. """ # TODO: split this up a bit. global _classic_worker_model - assert MuxProcess.cls_parent_sock is not None - MuxProcess.cls_parent_sock.close() - MuxProcess.cls_parent_sock = None + assert self.parent_sock is not None + self.parent_sock.close() + self.parent_sock = None self.listener_path = None self.router = None self.parent = None @@ -536,9 +543,6 @@ class ClassicWorkerModel(WorkerModel): """ See WorkerModel.on_strategy_start(). """ - if not self.initialized: - self._initialize() - self.initialized = True def on_strategy_complete(self): """ @@ -567,7 +571,6 @@ class ClassicWorkerModel(WorkerModel): self.router = None self.broker = None self.listener_path = None - self.initialized = False # #420: Ansible executes "meta" actions in the top-level process, # meaning "reset_connection" will cause :class:`mitogen.core.Latch` FDs @@ -598,25 +601,16 @@ class MuxProcess(object): See https://bugs.python.org/issue6721 for a thorough description of the class of problems this worker is intended to avoid. """ - #: In the top-level process, this references one end of a socketpair(), - #: whose other end child MuxProcesses block reading from to determine when - #: the master process dies. When the top-level exits abnormally, or - #: normally but where :func:`on_process_exit` has been called, this socket - #: will be closed, causing all the children to wake. - cls_parent_sock = None - - #: In the mux process, this is the other end of :attr:`cls_parent_sock`. - #: The main thread blocks on a read from it until :attr:`cls_parent_sock` - #: is closed. - cls_child_sock = None - #: A copy of :data:`os.environ` at the time the multiplexer process was #: started. It's used by mitogen_local.py to find changes made to the #: top-level environment (e.g. vars plugins -- issue #297) that must be #: applied to locally executed commands and modules. cls_original_env = None - def __init__(self, index): + def __init__(self, model, index): + #: :class:`ClassicWorkerModel` instance we were created by. + self.model = model + #: MuxProcess CPU index. self.index = index #: Individual path of this process. self.path = mitogen.unix.make_socket_path() @@ -625,7 +619,7 @@ class MuxProcess(object): self.pid = os.fork() if self.pid: # Wait for child to boot before continuing. - mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1) + mitogen.core.io_op(self.model.parent_sock.recv, 1) return ansible_mitogen.logging.set_process_name('mux:' + str(self.index)) @@ -635,8 +629,8 @@ class MuxProcess(object): os.path.basename(self.path), )) - MuxProcess.cls_parent_sock.close() - MuxProcess.cls_parent_sock = None + self.model.parent_sock.close() + self.model.parent_sock = None try: try: self.worker_main() @@ -660,9 +654,9 @@ class MuxProcess(object): try: # Let the parent know our listening socket is ready. - mitogen.core.io_op(self.cls_child_sock.send, b('1')) + mitogen.core.io_op(self.model.child_sock.send, b('1')) # Block until the socket is closed, which happens on parent exit. - mitogen.core.io_op(self.cls_child_sock.recv, 1) + mitogen.core.io_op(self.model.child_sock.recv, 1) finally: self.broker.shutdown() self.broker.join() From d408caccf504410d3e113863dafce977bb4e8291 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 3 Aug 2019 18:32:12 +0100 Subject: [PATCH 6/6] issue #573: guard against a forked top-level Ansible process See comment. --- ansible_mitogen/process.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index b8f10ec2..99e07aee 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -438,6 +438,12 @@ class ClassicWorkerModel(WorkerModel): :param bool _init_logging: For testing, if :data:`False`, don't initialize logging. """ + # #573: The process ID that installed the :mod:`atexit` handler. If + # some unknown Ansible plug-in forks the Ansible top-level process and + # later performs a graceful Python exit, it may try to wait for child + # PIDs it never owned, causing a crash. We want to avoid that. + self._pid = os.getpid() + common_setup(_init_logging=_init_logging) self.parent_sock, self.child_sock = socket.socketpair() @@ -451,7 +457,7 @@ class ClassicWorkerModel(WorkerModel): for mux in self._muxes: mux.start() - atexit.register(self._on_process_exit, self.parent_sock) + atexit.register(self._on_process_exit) self.child_sock.close() self.child_sock = None @@ -487,7 +493,7 @@ class ClassicWorkerModel(WorkerModel): self.listener_path = path - def _on_process_exit(self, sock): + def _on_process_exit(self): """ This is an :mod:`atexit` handler installed in the top-level process. @@ -501,15 +507,18 @@ class ClassicWorkerModel(WorkerModel): MuxProcess, debug logs may appear on the user's terminal *after* the prompt has been printed. """ + if self._pid != os.getpid(): + return + try: - sock.shutdown(socket.SHUT_WR) + self.parent_sock.shutdown(socket.SHUT_WR) except socket.error: # Already closed. This is possible when tests are running. LOG.debug('_on_process_exit: ignoring duplicate call') return - mitogen.core.io_op(sock.recv, 1) - sock.close() + mitogen.core.io_op(self.parent_sock.recv, 1) + self.parent_sock.close() for mux in self._muxes: _, status = os.waitpid(mux.pid, 0)