Merge pull request #203 from dw/dmw

Disconnection fixes
pull/205/head
dw 6 years ago committed by GitHub
commit d58ac0280b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -167,6 +167,9 @@ class Planner(object):
kwargs.setdefault('wrap_async', invocation.wrap_async)
return kwargs
def __repr__(self):
return '%s()' % (type(self).__name__,)
class BinaryPlanner(Planner):
"""

@ -199,6 +199,29 @@ class ContextService(mitogen.service.Service):
self._shutdown(context)
self._lru_by_via = {}
def _on_stream_disconnect(self, stream):
"""
Respond to Stream disconnection by deleting any record of contexts
reached via that stream. This method runs in the Broker thread and must
not to block.
"""
# TODO: there is a race between creation of a context and disconnection
# of its related stream. An error reply should be sent to any message
# in _waiters_by_key below.
self._lock.acquire()
try:
for context, key in list(self._key_by_context.items()):
if context.context_id in stream.routes:
LOG.info('Dropping %r due to disconnect of %r',
context, stream)
self._response_by_key.pop(key, None)
self._waiters_by_key.pop(key, None)
self._refs_by_context.pop(context, None)
self._lru_by_via.pop(context, None)
self._refs_by_context.pop(context, None)
finally:
self._lock.release()
def _connect(self, key, method_name, **kwargs):
"""
Actual connect implementation. Arranges for the Mitogen connection to
@ -240,14 +263,24 @@ class ContextService(mitogen.service.Service):
if kwargs.get('via'):
self._update_lru(context, method_name=method_name, **kwargs)
else:
# For directly connected contexts, listen to the associated
# Stream's disconnect event and use it to invalidate dependent
# Contexts.
stream = self.router.stream_by_id(context.context_id)
mitogen.core.listen(stream, 'disconnect',
lambda: self._on_stream_disconnect(stream))
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
from mitogen import debug
context.call(debug.dump_to_logger)
self._key_by_context[context] = key
self._refs_by_context[context] = 0
return {

@ -26,6 +26,7 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import errno
import fcntl
import getpass
import inspect
@ -593,7 +594,15 @@ class Stream(mitogen.core.Stream):
# on_disconnect() call.
return
pid, status = os.waitpid(self.pid, os.WNOHANG)
try:
pid, status = os.waitpid(self.pid, os.WNOHANG)
except OSError:
e = sys.exc_info()[1]
if e.args[0] == errno.ECHILD:
LOG.warn('%r: waitpid(%r) produced ECHILD', self.pid, self)
return
raise
if pid:
LOG.debug('%r: child process exit status was %d', self, status)
else:
@ -699,7 +708,11 @@ class Stream(mitogen.core.Stream):
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
self._connect_bootstrap(extra_fd)
try:
self._connect_bootstrap(extra_fd)
except Exception:
self._reap_child()
raise
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)

@ -1 +1,2 @@
- import_playbook: lru_one_target.yml
- import_playbook: reconnection.yml

@ -0,0 +1,30 @@
# Test ContextService ability to handle disconnections, including handling
# cleanup of dependent (via=) contexts.
- name: integration/context_service/reconnection.yml
hosts: all
any_errors_fatal: true
tasks:
- become: true
custom_python_detect_environment:
register: old_become_env
- become: true
# This must be >1 for vanilla Ansible.
shell: |
bash -c "( sleep 3; pkill -f sshd:; ) & disown"
- connection: local
shell: sleep 3
- wait_for_connection:
- become: true
custom_python_detect_environment:
register: new_become_env
# Verify the PIDs really changed (i.e. disconnection happened)
- assert:
that:
- old_become_env.pid != new_become_env.pid

@ -35,6 +35,7 @@ RUN yum clean all && \
DOCKERFILE = r"""
COPY data/001-mitogen.sudo /etc/sudoers.d/001-mitogen
RUN \
chsh -s /bin/bash && \
mkdir -p /var/run/sshd && \
echo i-am-mitogen-test-docker-image > /etc/sentinel && \
groupadd mitogen__sudo_nopw && \

@ -0,0 +1,3 @@
#!/bin/bash
# I am a Python interpreter that sits idle until the connection times out.
exec -a mitogen-tests-python-never-responds.sh sleep 86400

@ -1,3 +1,4 @@
import errno
import os
import subprocess
import tempfile
@ -9,6 +10,26 @@ import testlib
import mitogen.parent
class ReapChildTest(testlib.RouterMixin, testlib.TestCase):
def test_connect_timeout(self):
# Ensure the child process is reaped if the connection times out.
stream = mitogen.parent.Stream(
router=self.router,
remote_id=1234,
old_router=self.router,
max_message_size=self.router.max_message_size,
python_path=testlib.data_path('python_never_responds.sh'),
connect_timeout=0.5,
)
self.assertRaises(mitogen.core.TimeoutError,
lambda: stream.connect()
)
e = self.assertRaises(OSError,
lambda: os.kill(stream.pid, 0)
)
self.assertEquals(e.args[0], errno.ESRCH)
class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
def test_direct_eof(self):
e = self.assertRaises(mitogen.core.StreamError,

Loading…
Cancel
Save