Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  docs: update Changelog.
  core: serialize calls to _service_stub_main().
  docs: update Changelog; closes #532.
  issue #532: PushFileService race.
  docs: more concise Changelog.
  issue #541: changelog typos.
  ansible: quiesce boto logger; closes #541.
pull/564/head
David Wilson 6 years ago
commit 24dd64a998

@ -54,7 +54,8 @@ class Handler(logging.Handler):
#: may simply be to bury all target logs in DEBUG output, but not by
#: overriding their log level as done here.
NOISY_LOGGERS = frozenset([
'dnf', # issue #272; warns when a package is already installed.
'dnf', # issue #272; warns when a package is already installed.
'boto', # issue #541; normal boto retry logic can cause ERROR logs.
])
def emit(self, record):

@ -136,13 +136,17 @@ Fixes
repair ``delegate_to`` handling broke default ``ansible_python_interpreter``
handling. Test coverage was added.
* `#532 <https://github.com/dw/mitogen/issues/532>`_: fix a race in the service
used to propagate Ansible modules, that could easily manifest when starting
asynchronous tasks in a loop.
* `#536 <https://github.com/dw/mitogen/issues/536>`_: changes in 0.2.4 to
support Python 2.4 interacted poorly with modules that imported
``simplejson`` from a controller that also loaded an incompatible newer
version of ``simplejson``.
* `#538 <https://github.com/dw/mitogen/issues/538>`_: the Mitogen source
distribution includes a requisite ``LICENSE`` file.
* `#538 <https://github.com/dw/mitogen/issues/538>`_: the source distribution
includes a ``LICENSE`` file.
* `#539 <https://github.com/dw/mitogen/issues/539>`_: log output is no longer
duplicated when the Ansible ``log_path`` setting is enabled.
@ -150,19 +154,22 @@ Fixes
* `#540 <https://github.com/dw/mitogen/issues/540>`_: the ``stderr`` stream of
async module invocations was previously discarded.
* `748f5f67 <https://github.com/dw/mitogen/commit/748f5f67>`_: the
``ansible_ssh_host`` variable is respected when ``mitogen_via=`` is active.
* `21ad299d <https://github.com/dw/mitogen/commit/21ad299d>`_: the
precedence of ``ansible_ssh_user`` and ``ansible_user`` variables were
corrected when ``mitogen_via=`` is active.
* `8ae6ca1d <https://github.com/dw/mitogen/commit/8ae6ca1d>`_: the
``ansible_become_method`` variable is respected when ``mitogen_via=`` is
active.
* `7fd0d349 <https://github.com/dw/mitogen/commit/7fd0d349>`_: the
``ansible_ssh_port`` variable is respected when ``mitogen_via=`` is active.
* `#541 <https://github.com/dw/mitogen/issues/541>`_: Python error logs
originating from the ``boto`` package are quiesced, and only appear in
``-vvv`` output. This is since EC2 modules may trigger errors during normal
operation, when retrying transiently failing requests.
* `748f5f67 <https://github.com/dw/mitogen/commit/748f5f67>`_,
`21ad299d <https://github.com/dw/mitogen/commit/21ad299d>`_,
`8ae6ca1d <https://github.com/dw/mitogen/commit/8ae6ca1d>`_,
`7fd0d349 <https://github.com/dw/mitogen/commit/7fd0d349>`_:
the ``ansible_ssh_host``, ``ansible_ssh_user``, ``ansible_user``,
``ansible_become_method``, and ``ansible_ssh_port`` variables more correctly
match typical behaviour when ``mitogen_via=`` is active.
* `2a8567b4 <https://github.com/dw/mitogen/commit/2a8567b4>`_: fix a race
initializing a child's service thread pool on Python 3.4+, due to a change in
locking scheme used by the Python import mechanism.
Thanks!

@ -3136,10 +3136,21 @@ class ExternalContext(object):
if not self.config['profiling']:
os.kill(os.getpid(), signal.SIGTERM)
#: On Python >3.4, the global importer lock has been sharded into a
#: per-module lock, meaning there is no guarantee the import statement in
#: service_stub_main will be truly complete before a second thread
#: attempting the same import will see a partially initialized module.
#: Sigh. Therefore serialize execution of the stub itself.
service_stub_lock = threading.Lock()
def _service_stub_main(self, msg):
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
self.service_stub_lock.acquire()
try:
import mitogen.service
pool = mitogen.service.get_or_create_pool(router=self.router)
pool._receiver._on_receive(msg)
finally:
self.service_stub_lock.release()
def _on_call_service_msg(self, msg):
"""

@ -98,6 +98,7 @@ def on_fork():
fixup_prngs()
mitogen.core.Latch._on_fork()
mitogen.core.Side._on_fork()
mitogen.core.ExternalContext.service_stub_lock = threading.Lock()
mitogen__service = sys.modules.get('mitogen.service')
if mitogen__service:

@ -603,8 +603,6 @@ class PushFileService(Service):
This service will eventually be merged into FileService.
"""
invoker_class = SerializedInvoker
def __init__(self, **kwargs):
super(PushFileService, self).__init__(**kwargs)
self._lock = threading.Lock()
@ -613,13 +611,16 @@ class PushFileService(Service):
self._sent_by_stream = {}
def get(self, path):
"""
Fetch a file from the cache.
"""
assert isinstance(path, mitogen.core.UnicodeType)
self._lock.acquire()
try:
if path in self._cache:
return self._cache[path]
waiters = self._waiters.setdefault(path, [])
latch = mitogen.core.Latch()
waiters = self._waiters.setdefault(path, [])
waiters.append(lambda: latch.put(None))
finally:
self._lock.release()
@ -633,14 +634,15 @@ class PushFileService(Service):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent and child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
elif path not in sent:
if path in sent:
if child.context_id != context.context_id:
child.call_service_async(
service_name=self.name(),
method_name='forward',
path=path,
context=context
).close()
else:
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
@ -680,14 +682,6 @@ class PushFileService(Service):
fp.close()
self._forward(context, path)
def _store(self, path, data):
self._lock.acquire()
try:
self._cache[path] = data
return self._waiters.pop(path, [])
finally:
self._lock.release()
@expose(policy=AllowParents())
@no_reply()
@arg_spec({
@ -696,9 +690,16 @@ class PushFileService(Service):
'context': mitogen.core.Context,
})
def store_and_forward(self, path, data, context):
LOG.debug('%r.store_and_forward(%r, %r, %r)',
self, path, data, context)
waiters = self._store(path, data)
LOG.debug('%r.store_and_forward(%r, %r, %r) %r',
self, path, data, context,
threading.currentThread().getName())
self._lock.acquire()
try:
self._cache[path] = data
waiters = self._waiters.pop(path, [])
finally:
self._lock.release()
if context.context_id != mitogen.context_id:
self._forward(context, path)
for callback in waiters:
@ -712,10 +713,17 @@ class PushFileService(Service):
})
def forward(self, path, context):
LOG.debug('%r.forward(%r, %r)', self, path, context)
if path not in self._cache:
LOG.error('%r: %r is not in local cache', self, path)
return
self._forward(context, path)
func = lambda: self._forward(context, path)
self._lock.acquire()
try:
if path in self._cache:
func()
else:
LOG.debug('%r: %r not cached yet, queueing', self, path)
self._waiters.setdefault(path, []).append(func)
finally:
self._lock.release()
class FileService(Service):

@ -0,0 +1,56 @@
import os
import tempfile
import unittest2
import mitogen.core
import mitogen.service
import testlib
from mitogen.core import b
def prepare():
# ensure module loading delay is complete before loading PushFileService.
pass
@mitogen.core.takes_router
def wait_for_file(path, router):
pool = mitogen.service.get_or_create_pool(router=router)
service = pool.get_service(u'mitogen.service.PushFileService')
return service.get(path)
class PropagateToTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.service.PushFileService
def test_two_grandchild_one_intermediary(self):
tf = tempfile.NamedTemporaryFile()
path = mitogen.core.to_text(tf.name)
try:
tf.write(b('test'))
tf.flush()
interm = self.router.local(name='interm')
c1 = self.router.local(via=interm, name='c1')
c2 = self.router.local(via=interm)
c1.call(prepare)
c2.call(prepare)
service = self.klass(router=self.router)
service.propagate_to(context=c1, path=path)
service.propagate_to(context=c2, path=path)
s = c1.call(wait_for_file, path=path)
self.assertEquals(b('test'), s)
s = c2.call(wait_for_file, path=path)
self.assertEquals(b('test'), s)
finally:
tf.close()
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save