Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #415: replace default Poller with select.poll()
  issue #415: add IPC latency bench.
  issue #408: fix test fallout.
  issue #408: update Changelog; closes #408.
  issue #408: 2.4 compat: replace iter_read with explicit generator
  issue #408: 2.4 compat: remove ternary if use in master.py.
  issue #408: use compatible method to get thread ID.
  issue #408: fix mitogen.compat.tokenize 2.4 compatibility.
  issue #436: decode debug log lines on Python 3.
  issue #461: Ansible 2.3 placeholder modules for action plug-ins.
  issue #461: Ansible 2.3-compatible _get_candidate_temp_dirs().
  issue #461: Ansible 2.3 did not have _load_name.
issue510
David Wilson 6 years ago
commit f87d55d11c

@ -223,7 +223,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
""" """
LOG.debug('_fixup_perms2(%r, remote_user=%r, execute=%r)', LOG.debug('_fixup_perms2(%r, remote_user=%r, execute=%r)',
remote_paths, remote_user, execute) remote_paths, remote_user, execute)
if execute and self._load_name not in self.FIXUP_PERMS_RED_HERRING: if execute and self._task.action not in self.FIXUP_PERMS_RED_HERRING:
return self._remote_chmod(remote_paths, mode='u+x') return self._remote_chmod(remote_paths, mode='u+x')
return self.COMMAND_RESULT.copy() return self.COMMAND_RESULT.copy()

@ -79,14 +79,17 @@ else:
def _get_candidate_temp_dirs(): def _get_candidate_temp_dirs():
options = ansible.constants.config.get_plugin_options('shell', 'sh') try:
# >=2.5
# Pre 2.5 this came from ansible.constants. options = ansible.constants.config.get_plugin_options('shell', 'sh')
remote_tmp = (options.get('remote_tmp') or remote_tmp = options.get('remote_tmp') or ansible.constants.DEFAULT_REMOTE_TMP
ansible.constants.DEFAULT_REMOTE_TMP) system_tmpdirs = options.get('system_tmpdirs', ('/var/tmp', '/tmp'))
dirs = list(options.get('system_tmpdirs', ('/var/tmp', '/tmp'))) except AttributeError:
dirs.insert(0, remote_tmp) # 2.3
return mitogen.utils.cast(dirs) remote_tmp = ansible.constants.DEFAULT_REMOTE_TMP
system_tmpdirs = ('/var/tmp', '/tmp')
return mitogen.utils.cast([remote_tmp] + list(system_tmpdirs))
def key_from_dict(**kwargs): def key_from_dict(**kwargs):

@ -153,6 +153,11 @@ Enhancements
``mitogen_host_pinned`` strategy wraps the ``host_pinned`` strategy ``mitogen_host_pinned`` strategy wraps the ``host_pinned`` strategy
introduced in Ansible 2.7. introduced in Ansible 2.7.
* `#415 <https://github.com/dw/mitogen/issues/415>`_: the interface employed for
in-process queues was changed from Kqueue/epoll() to poll(), which requires
no setup or teardown, yielding a 30% latency reduction for inter-thread
communication. This may manifest as a runtime improvement in many-host runs.
Fixes Fixes
^^^^^ ^^^^^
@ -248,6 +253,9 @@ Core Library
the point the error occurs that previously would have been buried in debug the point the error occurs that previously would have been buried in debug
log output from an unrelated context. log output from an unrelated context.
* `#408 <https://github.com/dw/mitogen/issues/408>`_: a variety of fixes were
made to restore Python 2.4 compatibility.
* `#399 <https://github.com/dw/mitogen/issues/399>`_, * `#399 <https://github.com/dw/mitogen/issues/399>`_,
`#437 <https://github.com/dw/mitogen/issues/437>`_: ignore a `#437 <https://github.com/dw/mitogen/issues/437>`_: ignore a
:class:`DeprecationWarning` to avoid failure of the ``su`` method on Python :class:`DeprecationWarning` to avoid failure of the ``su`` method on Python
@ -379,6 +387,7 @@ bug reports, testing, features and fixes in this release contributed by
`Michael DeHaan <https://github.com/mpdehaan>`_, `Michael DeHaan <https://github.com/mpdehaan>`_,
`Mohammed Naser <https://github.com/mnaser/>`_, `Mohammed Naser <https://github.com/mnaser/>`_,
`Stéphane <https://github.com/sboisson/>`_, `Stéphane <https://github.com/sboisson/>`_,
`@whky <https://github.com/whky/>`_,
`@syntonym <https://github.com/syntonym/>`_, `@syntonym <https://github.com/syntonym/>`_,
`@yodatak <https://github.com/yodatak/>`_, and `@yodatak <https://github.com/yodatak/>`_, and
`Younès HAFRI <https://github.com/yhafri>`_. `Younès HAFRI <https://github.com/yhafri>`_.

@ -392,8 +392,11 @@ def generate_tokens(readline):
(initial == '.' and token != '.'): # ordinary number (initial == '.' and token != '.'): # ordinary number
yield (NUMBER, token, spos, epos, line) yield (NUMBER, token, spos, epos, line)
elif initial in '\r\n': elif initial in '\r\n':
yield (NL if parenlev > 0 else NEWLINE, if parenlev > 0:
token, spos, epos, line) n = NL
else:
n = NEWLINE
yield (n, token, spos, epos, line)
elif initial == '#': elif initial == '#':
assert not token.endswith("\n") assert not token.endswith("\n")
yield (COMMENT, token, spos, epos, line) yield (COMMENT, token, spos, epos, line)

@ -57,6 +57,11 @@ import imp
# Absolute imports for <2.5. # Absolute imports for <2.5.
select = __import__('select') select = __import__('select')
try:
import thread
except ImportError:
import threading as thread
try: try:
import cPickle as pickle import cPickle as pickle
except ImportError: except ImportError:
@ -1685,7 +1690,7 @@ class Poller(object):
A poller manages OS file descriptors the user is waiting to become A poller manages OS file descriptors the user is waiting to become
available for IO. The :meth:`poll` method blocks the calling thread available for IO. The :meth:`poll` method blocks the calling thread
until one or more become ready. The default implementation is based on until one or more become ready. The default implementation is based on
:func:`select.select`. :func:`select.poll`.
Each descriptor has an associated `data` element, which is unique for each Each descriptor has an associated `data` element, which is unique for each
readiness type, and defaults to being the same as the file descriptor. The readiness type, and defaults to being the same as the file descriptor. The
@ -1708,6 +1713,15 @@ class Poller(object):
Pollers may only be used by one thread at a time. Pollers may only be used by one thread at a time.
""" """
# This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
# no upper FD limit, it is suitable for use with Latch, which must handle
# FDs larger than select's limit during many-host runs. We want this
# because poll() requires no setup and teardown: just a single system call,
# which is important because Latch.get() creates a Poller on each
# invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in
# this scenario. If select() must return in future, it is important
# Latch.poller_class is set from parent.py to point to the industrial
# strength poller for the OS, otherwise Latch will fail randomly.
#: Increments on every poll(). Used to version _rfds and _wfds. #: Increments on every poll(). Used to version _rfds and _wfds.
_generation = 1 _generation = 1
@ -1715,6 +1729,7 @@ class Poller(object):
def __init__(self): def __init__(self):
self._rfds = {} self._rfds = {}
self._wfds = {} self._wfds = {}
self._pollobj = select.poll()
def __repr__(self): def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self)) return '%s(%#x)' % (type(self).__name__, id(self))
@ -1741,11 +1756,23 @@ class Poller(object):
""" """
pass pass
def _update(self, fd):
mask = (((fd in self._rfds) and select.POLLIN) |
((fd in self._wfds) and select.POLLOUT))
if mask:
self._pollobj.register(fd, mask)
else:
try:
self._pollobj.unregister(fd)
except KeyError:
pass
def start_receive(self, fd, data=None): def start_receive(self, fd, data=None):
""" """
Cause :meth:`poll` to yield `data` when `fd` is readable. Cause :meth:`poll` to yield `data` when `fd` is readable.
""" """
self._rfds[fd] = (data or fd, self._generation) self._rfds[fd] = (data or fd, self._generation)
self._update(fd)
def stop_receive(self, fd): def stop_receive(self, fd):
""" """
@ -1755,12 +1782,14 @@ class Poller(object):
change in future. change in future.
""" """
self._rfds.pop(fd, None) self._rfds.pop(fd, None)
self._update(fd)
def start_transmit(self, fd, data=None): def start_transmit(self, fd, data=None):
""" """
Cause :meth:`poll` to yield `data` when `fd` is writeable. Cause :meth:`poll` to yield `data` when `fd` is writeable.
""" """
self._wfds[fd] = (data or fd, self._generation) self._wfds[fd] = (data or fd, self._generation)
self._update(fd)
def stop_transmit(self, fd): def stop_transmit(self, fd):
""" """
@ -1770,25 +1799,24 @@ class Poller(object):
change in future. change in future.
""" """
self._wfds.pop(fd, None) self._wfds.pop(fd, None)
self._update(fd)
def _poll(self, timeout): def _poll(self, timeout):
(rfds, wfds, _), _ = io_op(select.select, if timeout:
self._rfds, timeout *= 1000
self._wfds,
(), timeout events, _ = io_op(self._pollobj.poll, timeout)
) for fd, event in events:
if event & select.POLLIN:
for fd in rfds: _vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
_vv and IOLOG.debug('%r: POLLIN for %r', self, fd) data, gen = self._rfds.get(fd, (None, None))
data, gen = self._rfds.get(fd, (None, None)) if gen and gen < self._generation:
if gen and gen < self._generation: yield data
yield data if event & select.POLLOUT:
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
for fd in wfds: data, gen = self._wfds.get(fd, (None, None))
_vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) if gen and gen < self._generation:
data, gen = self._wfds.get(fd, (None, None)) yield data
if gen and gen < self._generation:
yield data
def poll(self, timeout=None): def poll(self, timeout=None):
""" """
@ -1917,7 +1945,7 @@ class Latch(object):
This disambiguates legitimate wake-ups, accidental writes to the FD, This disambiguates legitimate wake-ups, accidental writes to the FD,
and buggy internal FD sharing. and buggy internal FD sharing.
""" """
ident = threading.currentThread().ident ident = thread.get_ident()
return b(u'%010d-%016x-%016x' % (os.getpid(), int(id(self)), ident)) return b(u'%010d-%016x-%016x' % (os.getpid(), int(id(self)), ident))
COOKIE_SIZE = len(_make_cookie(None)) COOKIE_SIZE = len(_make_cookie(None))

@ -145,24 +145,23 @@ LOAD_CONST = dis.opname.index('LOAD_CONST')
IMPORT_NAME = dis.opname.index('IMPORT_NAME') IMPORT_NAME = dis.opname.index('IMPORT_NAME')
def _getarg(nextb, c):
if c > dis.HAVE_ARGUMENT:
return nextb() | (nextb() << 8)
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
def iter_opcodes(co): def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`. # Yield `(op, oparg)` tuples from the code object `co`.
ordit = imap(ord, co.co_code) ordit = imap(ord, co.co_code)
nextb = ordit.next nextb = ordit.next
return ((c, (None return ((c, _getarg(nextb, c)) for c in ordit)
if c < dis.HAVE_ARGUMENT else
(nextb() | (nextb() << 8))))
for c in ordit)
elif sys.version_info < (3, 6): elif sys.version_info < (3, 6):
def iter_opcodes(co): def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`. # Yield `(op, oparg)` tuples from the code object `co`.
ordit = iter(co.co_code) ordit = iter(co.co_code)
nextb = ordit.__next__ nextb = ordit.__next__
return ((c, (None return ((c, _getarg(nextb, c)) for c in ordit)
if c < dis.HAVE_ARGUMENT else
(nextb() | (nextb() << 8))))
for c in ordit)
else: else:
def iter_opcodes(co): def iter_opcodes(co):
# Yield `(op, oparg)` tuples from the code object `co`. # Yield `(op, oparg)` tuples from the code object `co`.

@ -505,6 +505,49 @@ def write_all(fd, s, deadline=None):
poller.close() poller.close()
class IteratingRead(object):
def __init__(self, fds, deadline=None):
self.deadline = deadline
self.timeout = None
self.poller = PREFERRED_POLLER()
for fd in fds:
self.poller.start_receive(fd)
self.bits = []
self.timeout = None
def close(self):
self.poller.close()
def __iter__(self):
return self
def next(self):
while self.poller.readers:
if self.deadline is not None:
self.timeout = max(0, self.deadline - time.time())
if self.timeout == 0:
break
for fd in self.poller.poll(self.timeout):
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected or not s:
IOLOG.debug('iter_read(%r) -> disconnected', fd)
self.poller.stop_receive(fd)
else:
IOLOG.debug('iter_read(%r) -> %r', fd, s)
self.bits.append(s)
return s
if not self.poller.readers:
raise EofError(u'EOF on stream; last 300 bytes received: %r' %
(b('').join(self.bits)[-300:].decode('latin1'),))
raise mitogen.core.TimeoutError('read timed out')
__next__ = next
def iter_read(fds, deadline=None): def iter_read(fds, deadline=None):
"""Return a generator that arranges for up to 4096-byte chunks to be read """Return a generator that arranges for up to 4096-byte chunks to be read
at a time from the file descriptor `fd` until the generator is destroyed. at a time from the file descriptor `fd` until the generator is destroyed.
@ -522,36 +565,7 @@ def iter_read(fds, deadline=None):
:raises mitogen.core.StreamError: :raises mitogen.core.StreamError:
Attempt to read past end of file. Attempt to read past end of file.
""" """
poller = PREFERRED_POLLER() return IteratingRead(fds=fds, deadline=deadline)
for fd in fds:
poller.start_receive(fd)
bits = []
timeout = None
try:
while poller.readers:
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
break
for fd in poller.poll(timeout):
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected or not s:
IOLOG.debug('iter_read(%r) -> disconnected', fd)
poller.stop_receive(fd)
else:
IOLOG.debug('iter_read(%r) -> %r', fd, s)
bits.append(s)
yield s
finally:
poller.close()
if not poller.readers:
raise EofError(u'EOF on stream; last 300 bytes received: %r' %
(b('').join(bits)[-300:].decode('latin1'),))
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline): def discard_until(fd, s, deadline):
@ -928,11 +942,6 @@ PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
mitogen.core.Poller, mitogen.core.Poller,
) )
# For apps that start threads dynamically, it's possible Latch will also get
# very high-numbered wait fds when there are many connections, and so select()
# becomes useless there too. So swap in our favourite poller.
mitogen.core.Latch.poller_class = PREFERRED_POLLER
class DiagLogStream(mitogen.core.BasicStream): class DiagLogStream(mitogen.core.BasicStream):
""" """

@ -100,7 +100,8 @@ def filter_debug(stream, it):
if b('\n') not in buf: if b('\n') not in buf:
break break
line, _, buf = buf.partition(b('\n')) line, _, buf = buf.partition(b('\n'))
LOG.debug('%r: %s', stream, line.rstrip()) LOG.debug('%r: %s', stream,
mitogen.core.to_text(line.rstrip()))
state = 'start_of_line' state = 'start_of_line'
elif state == 'in_plain': elif state == 'in_plain':
line, nl, buf = buf.partition(b('\n')) line, nl, buf = buf.partition(b('\n'))

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,2 @@
# This is a placeholder so Ansible 2.3 can detect the corresponding action
# plug-in.

@ -0,0 +1,38 @@
"""
Measure latency of IPC between two local threads.
"""
import threading
import time
import mitogen
import ansible_mitogen.process
ansible_mitogen.process.setup_gil()
X = 20000
def flip_flop(ready, inp, out):
ready.put(None)
for x in xrange(X):
inp.get()
out.put(None)
ready = mitogen.core.Latch()
l1 = mitogen.core.Latch()
l2 = mitogen.core.Latch()
t1 = threading.Thread(target=flip_flop, args=(ready, l1, l2))
t2 = threading.Thread(target=flip_flop, args=(ready, l2, l1))
t1.start()
t2.start()
ready.get()
ready.get()
t0 = time.time()
l1.put(None)
t1.join()
t2.join()
print('++', int(1e6 * ((time.time() - t0) / (1.0+X))), 'usec')

@ -44,6 +44,21 @@ class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
class SshTest(testlib.DockerMixin, testlib.TestCase): class SshTest(testlib.DockerMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_debug_decoding(self):
# ensure filter_debug_logs() decodes the logged string.
capture = testlib.LogCapturer()
capture.start()
try:
context = self.docker_ssh(
username='mitogen__has_sudo',
password='has_sudo_password',
ssh_debug_level=3,
)
finally:
s = capture.stop()
self.assertTrue("'): debug1: Reading configuration data" in s)
def test_stream_name(self): def test_stream_name(self):
context = self.docker_ssh( context = self.docker_ssh(
username='mitogen__has_sudo', username='mitogen__has_sudo',

Loading…
Cancel
Save