Merge pull request #211 from dw/dmw

Docstring fixes, Ansible 2.5.1 fix & CI
pull/225/head
dw 6 years ago committed by GitHub
commit 3978e4e165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,11 +12,11 @@ python:
env:
- MODE=mitogen MITOGEN_TEST_DISTRO=debian
- MODE=mitogen MITOGEN_TEST_DISTRO=centos
- MODE=debops_common
- MODE=debops_common ANSIBLE_VERSION=2.4.3.0
- MODE=debops_common ANSIBLE_VERSION=2.5.1
- MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=debian
- MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=centos
- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=centos
- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=debian
- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=centos
- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=debian
install:
- pip install -r dev_requirements.txt

@ -4,6 +4,7 @@
TMPDIR="/tmp/debops-$$"
TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}"
TARGET_COUNT="${TARGET_COUNT:-2}"
ANSIBLE_VERSION="${ANSIBLE_VERSION:-2.4.3.0}"
MITOGEN_TEST_DISTRO=debian # Naturally DebOps only supports Debian.
export PYTHONPATH="${PYTHONPATH}:${TRAVIS_BUILD_DIR}"
@ -26,7 +27,7 @@ mkdir "$TMPDIR"
echo travis_fold:start:job_setup
pip install -qqqU debops==0.7.2 ansible==2.4.3.0
pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION}
debops-init "$TMPDIR/project"
cd "$TMPDIR/project"

@ -386,6 +386,8 @@ class NewStyleRunner(ScriptRunner):
def _run(self):
code = self._get_code()
mod = types.ModuleType('__main__')
mod.__file__ = self.program_fp.name
mod.__package__ = None
d = vars(mod)
e = None

@ -347,27 +347,81 @@ class ContextService(mitogen.service.Service):
class FileService(mitogen.service.Service):
"""
Primitive latency-inducing file server for old-style incantations of the
module runner. This is to be replaced later with a scheme that forwards
files known to be missing without the target having to ask for them,
avoiding a corresponding roundtrip per file.
Paths must be explicitly added to the service by a trusted context before
they will be served to an untrusted context.
Streaming file server, used to serve both small files like Ansible module
sources, and huge files like ISO images. Paths must be explicitly added to
the service by a trusted context before they will be served to an untrusted
context.
The file service nominally lives on the mitogen.service.Pool() threads
shared with ContextService above, however for simplicity it also maintains
a dedicated thread from where file chunks are scheduled.
The scheduler thread is responsible for dividing transfer requests up among
the physical streams that connect to those contexts, and ensure each stream
never has an excessive amount of data buffered in RAM at any time.
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
reachable over the same stream (e.g. one is the SSH account, another is a
sudo account, and a third is a proxied SSH connection), each request is
satisfied in turn before chunks for subsequent requests start flowing. This
ensures when a connection is contended, that preference is given to
completing individual transfers, rather than potentially aborting many
partially complete transfers, causing all the bandwidth used to be wasted.
Theory of operation:
1. Trusted context (i.e. a WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Untrusted context creates a mitogen.core.Receiver() to receive
file chunks. It then calls fetch(path, recv.to_sender()), which sets
up the transfer. The fetch() method returns the final file size and
notifies the dedicated thread of the transfer request.
3. The dedicated thread wakes from perpetual sleep, looks up the stream
used to communicate with the untrusted context, and begins pumping
128KiB-sized chunks until that stream's output queue reaches a
limit (1MiB).
4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary
to refill any drained output queue, which are being asynchronously
drained by the Stream implementation running on the Broker thread.
5. Once the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, and allows that code to compare the
transferred size with the total file size indicated by the return
value of the fetch() method.
6. If the sizes mismatch, the caller is informed, which will discard
the result and log an error.
7. Once all chunks have been pumped for all transfers, the dedicated
thread stops waking at 10ms intervals and resumes perpetual sleep.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for all the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() marks the dedicated thread's queue as
closed, causing the dedicated thread to wake immediately. It will
throw an exception that begins shutdown of the main loop.
3. The main loop calls Sender.close() prematurely for every pending
transfer, causing any Receiver loops in the target contexts to exit
early. The file size check fails, and the partially downloaded file
is discarded, and an error is logged.
4. Control exits the file transfer function in every target, and
graceful target shutdown can proceed normally, without the
associated thread needing to be forcefully killed.
"""
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
#: Maximum size of any stream's output queue before we temporarily stop
#: pumping more file chunks. The queue may overspill by up to
#: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
#: pumping more file chunks on that stream. The queue may overspill by up
#: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more queues to
#: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput
#: on any single stream is 100MiB/sec, which is 5x what SSH can handle on
#: my laptop.
#: Time spent by the scheduler thread asleep when it has no more data to
#: pump, but while at least one transfer remains active. With
#: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any
#: single stream is 100MiB/sec, which is 5x what SSH can handle on my
#: laptop.
sleep_delay_ms = 0.01
def __init__(self, router):
@ -381,12 +435,20 @@ class FileService(mitogen.service.Service):
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def on_shutdown(self):
"""
Respond to shutdown of the service pool by marking our queue closed.
This causes :meth:`_sleep_on_queue` to wake immediately and return
:data:`False`, causing the scheduler main thread to exit.
"""
self._queue.close()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as scheduler
This must be done synchronized with the Broker, as OS scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
@ -438,11 +500,12 @@ class FileService(mitogen.service.Service):
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
if self._schedule_pending:
timeout = self.sleep_delay_ms
else:
timeout = None
try:
if self._schedule_pending:
timeout = self.sleep_delay_ms
else:
timeout = None
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
@ -513,8 +576,6 @@ class FileService(mitogen.service.Service):
raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r', path)
self._queue.put((
sender,
open(path, 'rb', mitogen.core.CHUNK_SIZE),
))
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
self._queue.put((sender, fp))
return self._size_by_path[path]

@ -441,10 +441,10 @@ class Receiver(object):
def empty(self):
return self._latch.empty()
def get(self, timeout=None, block=True):
def get(self, timeout=None, block=True, throw_dead=True):
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block)
if msg.is_dead:
if msg.is_dead and throw_dead:
if msg.src_id == mitogen.context_id:
raise ChannelError(ChannelError.local_msg)
else:

@ -0,0 +1 @@
- import_playbook: file_transfer.yml

@ -0,0 +1,68 @@
- name: bench/file_transfer.yml
hosts: all
any_errors_fatal: true
tasks:
- name: Make 32MiB file
connection: local
shell: openssl rand 33554432 > /tmp/bigfile.in
- name: Make 320MiB file
connection: local
shell: >
cat
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
/tmp/bigfile.in
> /tmp/bigbigfile.in
- name: Delete SSH file is present.
file:
path: "{{item}}"
state: absent
become: true
with_items:
- /tmp/bigfile.out
- /tmp/bigbigfile.out
- name: Copy 32MiB file via SSH
copy:
src: /tmp/bigfile.in
dest: /tmp/bigfile.out
- name: Copy 320MiB file via SSH
copy:
src: /tmp/bigbigfile.in
dest: /tmp/bigbigfile.out
- name: Delete localhost sudo file if present.
file:
path: "{{item}}"
state: absent
connection: local
become: true
with_items:
- /tmp/bigfile.out
- /tmp/bigbigfile.out
- name: Copy 32MiB file via localhost sudo
connection: local
become: true
copy:
src: /tmp/bigfile.in
dest: /tmp/bigfile.out
- name: Copy 320MiB file via localhost sudo
connection: local
become: true
copy:
src: /tmp/bigbigfile.in
dest: /tmp/bigbigfile.out

@ -21,6 +21,11 @@ input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
# v2.5.1. apt.py started depending on this.
# https://github.com/dw/mitogen/issues/210
print " \"__file__\": \"%s\"," % (__file__,)
# Python sets this during a regular import.
print " \"__package__\": \"%s\"," % (__package__,)
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)

@ -223,40 +223,50 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
# Verify sending a message to an invalid handle yields a dead message
# from the target context.
l1 = self.router.fork()
recv = l1.send_async(mitogen.core.Message(handle=999))
msg = recv.get(throw_dead=False)
self.assertEquals(msg.is_dead, True)
self.assertEquals(msg.src_id, l1.context_id)
recv = l1.send_async(mitogen.core.Message(handle=999))
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg)
def test_totally_invalid_context_returns_dead(self):
recv = mitogen.core.Receiver(self.router)
self.router.route(
mitogen.core.Message(
dst_id=1234,
handle=1234,
reply_to=recv.handle,
)
msg = mitogen.core.Message(
dst_id=1234,
handle=1234,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_previously_alive_context_returns_dead(self):
l1 = self.router.fork()
l1.shutdown(wait=True)
recv = mitogen.core.Receiver(self.router)
self.router.route(
mitogen.core.Message(
dst_id=l1.context_id,
handle=mitogen.core.CALL_FUNCTION,
reply_to=recv.handle,
)
msg = mitogen.core.Message(
dst_id=l1.context_id,
handle=mitogen.core.CALL_FUNCTION,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)

Loading…
Cancel
Save