diff --git a/ansible_mitogen/logging.py b/ansible_mitogen/logging.py index e2035254..1c439be8 100644 --- a/ansible_mitogen/logging.py +++ b/ansible_mitogen/logging.py @@ -54,7 +54,8 @@ class Handler(logging.Handler): #: may simply be to bury all target logs in DEBUG output, but not by #: overriding their log level as done here. NOISY_LOGGERS = frozenset([ - 'dnf', # issue #272; warns when a package is already installed. + 'dnf', # issue #272; warns when a package is already installed. + 'boto', # issue #541; normal boto retry logic can cause ERROR logs. ]) def emit(self, record): diff --git a/docs/changelog.rst b/docs/changelog.rst index 84162bf8..180fe9e0 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -136,13 +136,17 @@ Fixes repair ``delegate_to`` handling broke default ``ansible_python_interpreter`` handling. Test coverage was added. +* `#532 `_: fix a race in the service + used to propagate Ansible modules, that could easily manifest when starting + asynchronous tasks in a loop. + * `#536 `_: changes in 0.2.4 to support Python 2.4 interacted poorly with modules that imported ``simplejson`` from a controller that also loaded an incompatible newer version of ``simplejson``. -* `#538 `_: the Mitogen source - distribution includes a requisite ``LICENSE`` file. +* `#538 `_: the source distribution + includes a ``LICENSE`` file. * `#539 `_: log output is no longer duplicated when the Ansible ``log_path`` setting is enabled. @@ -150,19 +154,22 @@ Fixes * `#540 `_: the ``stderr`` stream of async module invocations was previously discarded. -* `748f5f67 `_: the - ``ansible_ssh_host`` variable is respected when ``mitogen_via=`` is active. - -* `21ad299d `_: the - precedence of ``ansible_ssh_user`` and ``ansible_user`` variables were - corrected when ``mitogen_via=`` is active. - -* `8ae6ca1d `_: the - ``ansible_become_method`` variable is respected when ``mitogen_via=`` is - active. - -* `7fd0d349 `_: the - ``ansible_ssh_port`` variable is respected when ``mitogen_via=`` is active. +* `#541 `_: Python error logs + originating from the ``boto`` package are quiesced, and only appear in + ``-vvv`` output. This is since EC2 modules may trigger errors during normal + operation, when retrying transiently failing requests. + +* `748f5f67 `_, + `21ad299d `_, + `8ae6ca1d `_, + `7fd0d349 `_: + the ``ansible_ssh_host``, ``ansible_ssh_user``, ``ansible_user``, + ``ansible_become_method``, and ``ansible_ssh_port`` variables more correctly + match typical behaviour when ``mitogen_via=`` is active. + +* `2a8567b4 `_: fix a race + initializing a child's service thread pool on Python 3.4+, due to a change in + locking scheme used by the Python import mechanism. Thanks! diff --git a/mitogen/core.py b/mitogen/core.py index 920a94b6..470b00ca 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -3136,10 +3136,21 @@ class ExternalContext(object): if not self.config['profiling']: os.kill(os.getpid(), signal.SIGTERM) + #: On Python >3.4, the global importer lock has been sharded into a + #: per-module lock, meaning there is no guarantee the import statement in + #: service_stub_main will be truly complete before a second thread + #: attempting the same import will see a partially initialized module. + #: Sigh. Therefore serialize execution of the stub itself. + service_stub_lock = threading.Lock() + def _service_stub_main(self, msg): - import mitogen.service - pool = mitogen.service.get_or_create_pool(router=self.router) - pool._receiver._on_receive(msg) + self.service_stub_lock.acquire() + try: + import mitogen.service + pool = mitogen.service.get_or_create_pool(router=self.router) + pool._receiver._on_receive(msg) + finally: + self.service_stub_lock.release() def _on_call_service_msg(self, msg): """ diff --git a/mitogen/fork.py b/mitogen/fork.py index c78558b8..d6685d70 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -98,6 +98,7 @@ def on_fork(): fixup_prngs() mitogen.core.Latch._on_fork() mitogen.core.Side._on_fork() + mitogen.core.ExternalContext.service_stub_lock = threading.Lock() mitogen__service = sys.modules.get('mitogen.service') if mitogen__service: diff --git a/mitogen/service.py b/mitogen/service.py index 0fba59c9..3254e69a 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -603,8 +603,6 @@ class PushFileService(Service): This service will eventually be merged into FileService. """ - invoker_class = SerializedInvoker - def __init__(self, **kwargs): super(PushFileService, self).__init__(**kwargs) self._lock = threading.Lock() @@ -613,13 +611,16 @@ class PushFileService(Service): self._sent_by_stream = {} def get(self, path): + """ + Fetch a file from the cache. + """ assert isinstance(path, mitogen.core.UnicodeType) self._lock.acquire() try: if path in self._cache: return self._cache[path] - waiters = self._waiters.setdefault(path, []) latch = mitogen.core.Latch() + waiters = self._waiters.setdefault(path, []) waiters.append(lambda: latch.put(None)) finally: self._lock.release() @@ -633,14 +634,15 @@ class PushFileService(Service): stream = self.router.stream_by_id(context.context_id) child = mitogen.core.Context(self.router, stream.remote_id) sent = self._sent_by_stream.setdefault(stream, set()) - if path in sent and child.context_id != context.context_id: - child.call_service_async( - service_name=self.name(), - method_name='forward', - path=path, - context=context - ).close() - elif path not in sent: + if path in sent: + if child.context_id != context.context_id: + child.call_service_async( + service_name=self.name(), + method_name='forward', + path=path, + context=context + ).close() + else: child.call_service_async( service_name=self.name(), method_name='store_and_forward', @@ -680,14 +682,6 @@ class PushFileService(Service): fp.close() self._forward(context, path) - def _store(self, path, data): - self._lock.acquire() - try: - self._cache[path] = data - return self._waiters.pop(path, []) - finally: - self._lock.release() - @expose(policy=AllowParents()) @no_reply() @arg_spec({ @@ -696,9 +690,16 @@ class PushFileService(Service): 'context': mitogen.core.Context, }) def store_and_forward(self, path, data, context): - LOG.debug('%r.store_and_forward(%r, %r, %r)', - self, path, data, context) - waiters = self._store(path, data) + LOG.debug('%r.store_and_forward(%r, %r, %r) %r', + self, path, data, context, + threading.currentThread().getName()) + self._lock.acquire() + try: + self._cache[path] = data + waiters = self._waiters.pop(path, []) + finally: + self._lock.release() + if context.context_id != mitogen.context_id: self._forward(context, path) for callback in waiters: @@ -712,10 +713,17 @@ class PushFileService(Service): }) def forward(self, path, context): LOG.debug('%r.forward(%r, %r)', self, path, context) - if path not in self._cache: - LOG.error('%r: %r is not in local cache', self, path) - return - self._forward(context, path) + func = lambda: self._forward(context, path) + + self._lock.acquire() + try: + if path in self._cache: + func() + else: + LOG.debug('%r: %r not cached yet, queueing', self, path) + self._waiters.setdefault(path, []).append(func) + finally: + self._lock.release() class FileService(Service): diff --git a/tests/push_file_service_test.py b/tests/push_file_service_test.py new file mode 100644 index 00000000..1dfff241 --- /dev/null +++ b/tests/push_file_service_test.py @@ -0,0 +1,56 @@ + +import os +import tempfile +import unittest2 + +import mitogen.core +import mitogen.service +import testlib +from mitogen.core import b + + +def prepare(): + # ensure module loading delay is complete before loading PushFileService. + pass + + +@mitogen.core.takes_router +def wait_for_file(path, router): + pool = mitogen.service.get_or_create_pool(router=router) + service = pool.get_service(u'mitogen.service.PushFileService') + return service.get(path) + + +class PropagateToTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.service.PushFileService + + def test_two_grandchild_one_intermediary(self): + tf = tempfile.NamedTemporaryFile() + path = mitogen.core.to_text(tf.name) + + try: + tf.write(b('test')) + tf.flush() + + interm = self.router.local(name='interm') + c1 = self.router.local(via=interm, name='c1') + c2 = self.router.local(via=interm) + + c1.call(prepare) + c2.call(prepare) + + service = self.klass(router=self.router) + service.propagate_to(context=c1, path=path) + service.propagate_to(context=c2, path=path) + + s = c1.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + + s = c2.call(wait_for_file, path=path) + self.assertEquals(b('test'), s) + finally: + tf.close() + + +if __name__ == '__main__': + unittest2.main()