Merge pull request #240 from dw/dmw

FileService optimizations, more compatible temp dir handling
pull/246/head
dw 6 years ago committed by GitHub
commit 8702b7521f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -50,7 +50,7 @@ except ImportError: # Ansible<2.4
import mitogen.core
import mitogen.master
from mitogen.utils import cast
import mitogen.utils
import ansible_mitogen.connection
import ansible_mitogen.planner
@ -292,7 +292,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
return os.path.join(self._connection.homedir, path[2:])
if path.startswith('~'):
# ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, path)
return self.call(os.path.expanduser, mitogen.utils.cast(path))
def _execute_module(self, module_name=None, module_args=None, tmp=None,
task_vars=None, persist_files=False,

@ -110,9 +110,9 @@ class Runner(object):
def get_temp_dir(self):
if not self._temp_dir:
self._temp_dir = ansible_mitogen.target.make_temp_directory(
self.remote_tmp,
)
self._temp_dir = tempfile.mkdtemp(prefix='ansible_mitogen_')
# https://github.com/dw/mitogen/issues/239
#ansible_mitogen.target.make_temp_directory(self.remote_tmp)
return self._temp_dir
def setup(self):

@ -367,77 +367,65 @@ class StreamState(object):
class FileService(mitogen.service.Service):
"""
Streaming file server, used to serve both small files like Ansible module
sources, and huge files like ISO images. Paths must be explicitly added to
the service by a trusted context before they will be served to an untrusted
context.
The file service nominally lives on the mitogen.service.Pool() threads
shared with ContextService above, however for simplicity it also maintains
a dedicated thread from where file chunks are scheduled.
The scheduler thread is responsible for dividing transfer requests up among
the physical streams that connect to those contexts, and ensure each stream
never has an excessive amount of data buffered in RAM at any time.
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
reachable over the same stream (e.g. one is the SSH account, another is a
sudo account, and a third is a proxied SSH connection), each request is
satisfied in turn before chunks for subsequent requests start flowing. This
ensures when a connection is contended, that preference is given to
completing individual transfers, rather than potentially aborting many
partially complete transfers, causing all the bandwidth used to be wasted.
Streaming file server, used to serve small files like Ansible modules and
huge files like ISO images. Paths must be registered by a trusted context
before they will be served to a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
while still maintaining enough to fully utilize available bandwidth. This
is achieved by making an initial bandwidth assumption, enqueueing enough
chunks to fill that assumed pipe, then responding to delivery
acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on
a stream (e.g. one is the SSH account, another is a sudo account, and a
third is a proxied SSH connection), each request is satisfied in turn
before subsequent requests start flowing. This ensures when a stream is
contended, priority is given to completing individual transfers rather than
potentially aborting many partial transfers, causing the bandwidth to be
wasted.
Theory of operation:
1. Trusted context (i.e. a WorkerProcess) calls register(), making a
1. Trusted context (i.e. WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Untrusted context creates a mitogen.core.Receiver() to receive
file chunks. It then calls fetch(path, recv.to_sender()), which sets
up the transfer. The fetch() method returns the final file size and
notifies the dedicated thread of the transfer request.
3. The dedicated thread wakes from perpetual sleep, looks up the stream
used to communicate with the untrusted context, and begins pumping
128KiB-sized chunks until that stream's output queue reaches a
limit (1MiB).
4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary
to refill any drained output queue, which are being asynchronously
drained by the Stream implementation running on the Broker thread.
5. Once the last chunk has been pumped for a single transfer,
2. Requestee context creates a mitogen.core.Receiver() to receive
chunks, then calls fetch(path, recv.to_sender()), to set up the
transfer.
3. fetch() replies to the call with the file's metadata, then
schedules an initial burst up to the window size limit (1MiB).
4. Chunks begin to arrive in the requestee, which calls acknowledge()
for each 128KiB received.
5. The acknowledge() call arrives at FileService, which scheduled a new
chunk to refill the drained window back to the size limit.
6. When the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, and allows that code to compare the
transferred size with the total file size indicated by the return
value of the fetch() method.
6. If the sizes mismatch, the caller is informed, which will discard
the result and log an error.
7. Once all chunks have been pumped for all transfers, the dedicated
thread stops waking at 10ms intervals and resumes perpetual sleep.
target.py::_get_file() to exit, allowing that code to compare the
transferred size with the total file size from the metadata.
7. If the sizes mismatch, _get_file()'s caller is informed which will
discard the result and log/raise an error.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for all the
1. process.py calls service.Pool.shutdown(), which arranges for the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() marks the dedicated thread's queue as
closed, causing the dedicated thread to wake immediately. It will
throw an exception that begins shutdown of the main loop.
3. The main loop calls Sender.close() prematurely for every pending
transfer, causing any Receiver loops in the target contexts to exit
early. The file size check fails, and the partially downloaded file
is discarded, and an error is logged.
4. Control exits the file transfer function in every target, and
graceful target shutdown can proceed normally, without the
associated thread needing to be forcefully killed.
2. FileService.on_shutdown() walks every in-progress transfer and calls
Sender.close(), causing Receiver loops in the requestees to exit
early. The size check fails and any partially downloaded file is
discarded.
3. Control exits _get_file() in every target, and graceful shutdown can
proceed normally, without the associated thread needing to be
forcefully killed.
"""
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Maximum size of any stream's output queue before we stop pumping more
#: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1
#: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum
#: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop.
max_queue_size = 1048576
#: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which
#: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
window_size_bytes = 1048576
def __init__(self, router):
super(FileService, self).__init__(router)
@ -497,22 +485,30 @@ class FileService(mitogen.service.Service):
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(mitogen.core.Message.pickled(' ' * mitogen.core.CHUNK_SIZE).data) -
mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`max_queue_size`. Must be
called with the StreamState lock held.
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the StreamState lock held.
:param StreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.max_queue_size:
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
state.unacked += len(s)
sender.send(s)
if not s:
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(s)
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
@ -527,7 +523,7 @@ class FileService(mitogen.service.Service):
})
def fetch(self, path, sender, msg):
"""
Fetch a file's data.
Start a transfer for a registered path.
:param str path:
File path.
@ -543,8 +539,7 @@ class FileService(mitogen.service.Service):
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises Error:
Unregistered path, or attempt to send to context that was not the
requestee context.
Unregistered path, or Sender did not match requestee context.
"""
if path not in self._metadata_by_path:
raise Error(self.unregistered_msg)
@ -552,7 +547,7 @@ class FileService(mitogen.service.Service):
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
fp = open(path, 'rb', self.IO_SIZE)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
@ -576,9 +571,9 @@ class FileService(mitogen.service.Service):
@mitogen.service.no_reply()
def acknowledge(self, size, msg):
"""
Acknowledgement bytes received by a transfer target, scheduling new
chunks to keep the window full. This should be called for every chunk
received by the target.
Acknowledge bytes received by a transfer target, scheduling new chunks
to keep the window full. This should be called for every chunk received
by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]

@ -90,7 +90,7 @@ Installation
------------
1. Thoroughly review the documented behavioural differences.
2. Verify Ansible 2.4/2.5 and Python 2.7 are listed in ``ansible --version``
2. Verify Ansible 2.3/2.4/2.5 and Python 2.7 are listed in ``ansible --version``
output.
3. Download and extract https://github.com/dw/mitogen/archive/master.zip
4. Modify ``ansible.cfg``:
@ -110,8 +110,8 @@ Installation
Noteworthy Differences
----------------------
* Ansible 2.4 and 2.5 are supported. File bugs to register interest in older
releases.
* Ansible 2.3, 2.4 and 2.5 are supported. File bugs to register interest in
older releases.
* The ``sudo`` become method is available and ``su`` is planned. File bugs to
register interest in additional methods.

@ -9,4 +9,5 @@
- import_playbook: connection_loader/all.yml
- import_playbook: context_service/all.yml
- import_playbook: playbook_semantics/all.yml
- import_playbook: remote_tmp/all.yml
- import_playbook: runner/all.yml

@ -0,0 +1,2 @@
- import_playbook: readonly_homedir.yml

@ -0,0 +1,21 @@
# https://github.com/dw/mitogen/issues/239
# While remote_tmp is used in the context of the SSH user by action code
# running on the controller, Ansiballz ignores it and uses the system default
# instead.
- name: integration/remote_tmp/readonly_homedir.yml
hosts: test-targets
any_errors_fatal: true
tasks:
- custom_python_detect_environment:
become: true
become_user: mitogen__readonly_homedir
register: out
vars:
ansible_become_pass: readonly_homedir_password
- debug: msg={{out}}
- name: Verify system temp directory was used.
assert:
that:
- out.argv[0].startswith("/tmp/ansible_mitogen_")

@ -51,6 +51,7 @@
- require_tty
- pw_required
- require_tty_pw_required
- readonly_homedir
when: ansible_system == 'Darwin'
- name: Create Mitogen test users
@ -84,6 +85,9 @@
with_sequence: start=1 end=21
when: ansible_distribution == 'MacOSX'
- name: Readonly homedir for one account
shell: "chown -R root: ~mitogen__readonly_homedir"
- name: Require a TTY for two accounts
lineinfile:
path: /etc/sudoers
@ -101,12 +105,13 @@
- mitogen__pw_required
- mitogen__require_tty_pw_required
- name: Allow passwordless for one account
- name: Allow passwordless for two accounts
lineinfile:
path: /etc/sudoers
line: "{{lookup('pipe', 'whoami')}} ALL = ({{item}}) NOPASSWD:ALL"
with_items:
- mitogen__require_tty
- mitogen__readonly_homedir
- name: Allow passwordless for many accounts
lineinfile:

@ -46,6 +46,8 @@ RUN \
useradd -s /bin/bash -m mitogen__pw_required && \
useradd -s /bin/bash -m mitogen__require_tty && \
useradd -s /bin/bash -m mitogen__require_tty_pw_required && \
useradd -s /bin/bash -m mitogen__readonly_homedir && \
chown -R root: ~mitogen__readonly_homedir && \
{ for i in `seq 1 21`; do useradd -s /bin/bash -m mitogen__user$i; done; } && \
( echo 'root:rootpassword' | chpasswd; ) && \
( echo 'mitogen__has_sudo:has_sudo_password' | chpasswd; ) && \
@ -55,6 +57,7 @@ RUN \
( echo 'mitogen__pw_required:pw_required_password' | chpasswd; ) && \
( echo 'mitogen__require_tty:require_tty_password' | chpasswd; ) && \
( echo 'mitogen__require_tty_pw_required:require_tty_pw_required_password' | chpasswd; ) && \
( echo 'mitogen__readonly_homedir:readonly_homedir_password' | chpasswd; ) && \
mkdir ~mitogen__has_sudo_pubkey/.ssh && \
{ echo '#!/bin/bash\nexec strace -ff -o /tmp/pywrap$$.trace python2.7 "$@"' > /usr/local/bin/pywrap; chmod +x /usr/local/bin/pywrap; }

Loading…
Cancel
Save