Given:
- Broker asleep in poll()
- thread B calling Latch.put()
Previously,
- B takes lock,
- B wakes socket by dropping GIL and writing to it
- Broker wakes from poll(), acquires GIL only to find Latch._lock is held
- Broker drops GIL, sleeps on futex() for _lock
- B wakes, acquires GIL, releases _lock
- Broker wakes from futex(), acquires lock
Now,
- B takes lock, updates state, releases lock
- B wakes socket by droppping GIL and writing to it
- Broker wakes from poll(), acquires GIL and _lock
- Everyone lives happily ever after.
Given:
- thread A asleep in Latch._get_sleep()
- thread B calling Latch.put()
Previously,
- B takes lock,
- B wakes socket by dropping GIL and writing to it
- A wakes from poll(), acquires GIL only to find Latch._lock is held
- A drops GIL, sleeps on futex() for _lock
- B wakes, acquires GIL, releases _lock
- A wakes from futex(), acquires lock
Now,
- B takes lock, updates state, releases lock
- B wakes socket by droppping GIL and writing to it
- A wakes from poll(), acquires GIL and _lock
- Everyone lives happily ever after.
Move all details of broker/router setup out of connection.py, instead
deferring it to a WorkerModel class exported by process.py via
get_worker_model(). The running strategy can override the configured
worker model via _get_worker_model().
ClassicWorkerModel is installed by default, which implements the
extension's existing process model.
Add optional support for the third party setproctitle module, so
children have pretty names in ps output.
Add optional support for per-CPU multiplexers to classic runs.
Now it's possible for stream.protocol to not refer to MitogenProtocol,
move the signal handler to a MitogenProtocol subclass instead.
Fixes a crash where CTRL+C during child bootstrap would print
AttributeError.
During early initialization under hackbench, it is possible for Broker
to be in LogHandler._send() while the main thread has already destroyed
_buffer. So we must synchronize them, but only while the handler is
corked.
Fix a race where if Stream.on_receive() detects disconnect, it calls
Stream.on_disconnect(), which fires Stream 'disconnect' event, whereas
if BufferedWriter.on_transmit() detects disconnect, it called
Protocol.on_disconnect(), which did not fire the Stream 'disconnect'
event.
Since mitogen.parent listens on Stream's 'disconnect' event to reap
children, this was causing a very difficult to trigger test failure.
Triggered after <1000 runs on a Xeon E5530 with hyperthreading using
hackbench running at the same priority:
$ hackbench -s 1048576 -l 100000000000 -g 4
To ensure a test process can successfully recreate an Ansible
MuxProcess, reset fork-inherited globals during disconnection.
There is basically no good place for this. Per the comments on #91, it
would be far better if the context's identity was tied to its router,
rather than some global variable.
Split Stream into many, many classes
* mitogen.parent.Connection: Handles connection setup logic only.
* Maintain references to stdout and stderr streams.
* Manages TimerList timer to cancel connection attempt after
deadline
* Blocking setup code replaced by async equivalents running on the
broker
* mitogen.parent.Options: Tracks connection-specific options. This
keeps the connection class small, but more importantly, it is
generic to the future desire to build and execute command lines
without starting a full connection.
* mitogen.core.Protocol: Handles program behaviour relating to events
on a stream. Protocol performs no IO of its own, instead deferring
it to Stream and Side. This makes testing much easier, and means
libssh can reimplement Stream and Side to reuse MitogenProtocol
* mitogen.core.MitogenProtocol: Guts of the old Mitogen stream
implementtion
* mitogen.core.BufferedWriter: Guts of the old Mitogen buffered
transmit implementation, made generic
* mitogen.core.DelineatedProtocol: Guts of the old IoLogger, knows how
to split up input and pass it on to a
on_line_received()/on_partial_line_received() callback.
* mitogen.parent.BootstrapProtocol: Asynchronous equivalent of the old
blocking connect code. Waits for various prompts (MITO001 etc) and
writes the bootstrap using a BufferedWriter. On success, switches
the stream to MitogenProtocol.
* mitogen.core.Message: move encoding parts of MitogenProtocol out to
Message (where it belongs) and write a bunch of new tests for
pickling.
* The bizarre Stream.construct() is gone now, Option.__init__ is its
own constructor. Should fix many LGTM errors.
* Update all connection methods: Every connection method is updated to
use async logic, defining protocols as required to handle interactive
prompts like in SSH or su. Add new real integration tests for at least
doas and su.
* Eliminate manual fd management: File descriptors are trapped in file
objects at their point of origin, and Side is updated to use file
objects rather than raw descriptors. This eliminates a whole class of
bugs where unrelated FDs could be closed by the wrong component. Now
an FD's open/closed status is fused to it everywhere in the library.
* Halve file descriptor usage: now FD open/close state is tracked by
its file object, we don't need to duplicate FDs everywhere so that
receive/transmit side can be closed independently. Instead both sides
back on to the same file object. Closes#26, Closes#470.
* Remove most uses of dup/dup2: Closes#256. File descriptors are
trapped in a common file object and shared among classes. The
remaining few uses for dup/dup2 are as close to minimal as possible.
* Introduce mitogen.parent.Process: uniform interface for subprocesses
created either via mitogen.fork or the subprocess module. Remove all
the crap where we steal a pid from subprocess guts. Now we use
subprocess to manage its processes as it should be. Closes#169 by
using the new Timers facility to poll for a slow-to-exit subprocess.
* Fix su password race: Closes#363. DelineatedProtocol naturally
retries partially received lines, preventing the cause of the original
race.
* Delete old blocking IO utility functions
iter_read()/write_all()/discard_until().
Closes#26Closes#147Closes#169Closes#256Closes#363Closes#419Closes#470
It's used in later commit. This is an os.pipe() wrapper that traps the
file descriptors in a file object, to ensure leaked objects will
eventually be collected, and a central place exists to track open/closed
status.
This is the same problem as used to afflict Stream: large input buffers
containing many small messages cause intense string copying. This
eliminates the worst of it.
Now it's possible to find both packages and modules when the
sys.modules[...] state for the package/module is junk. Previously only
modules were possible.
This also refactors things to make writing better tests for all these
cases much simpler.
There has always been a race in PushFileService since given a parent
asked to forward modules to two children via some intermediary:
interm = router.local()
c1 = router.local(via=interm)
c2 = router.local(via=interm)
service.propagate_to(c1, 'foo/bar.py')
service.propagate_to(c2, 'foo/bar.py')
Two calls will be emitted to 'interm':
PushFileService.store_and_forward(c1, 'foo/bar.py', [blob])
PushFileService.store(c2, 'foo/bar.py')
Which will be processed in-order up to the point where service pool
threads in 'interm' are woken to process the message.
While it is guaranteed store_and_forward() will be processed first, no
guarantee existed that its assigned pool thread would wake and take
_lock first, thus it was possible for forward() to win the race, and for
a request to arrive to forward a file that had not been placed in local
cache yet.
Here we get rid of SerializedInvoker entirely, as it is partially to
blame for hiding the race: SerializedInvoker can only ensure no two
messages are processed simultaneously, it cannot ensure the messages are
processed in their intended order.
Instead, teach forward() that it may be called before
store_and_forward(), and if that is the case, to place the forward
request on to _waiters alongside any local threads blocked in get().
Regardless of the version of simplejson loaded in the master, load up
the ModuleResponder cache with our 2.4-compatible version.
To cope with simplejson being loaded due to modules like ec2_group that
try to import it before importing 'json', also update target.py to
remove it from the whitelist if a local 'json' module import succeeds.
Since 802de6a8d5, sudo on CentOS 5 had
begun failing due to a TTY FD leak in the parent process being fixed.
The old versions of sudo doesn't hang around after starting a child --
they exec the privilege-escalated child process on top of themselves,
meaning no spare copy of the TTY FD is kept alive by sudo.
When the child starts up, it replaces stdio with IoLoggers, including
the inherited stderr FD connected to DiagLogStream/the slave PTY. When
the last process closes a slave PTY, the kernel sends SIGHUP to any
processes still having it as the controlling TTY.
Therefore we must either ignore SIGHUP until the first stage has been
waited on (since the first stage also preserve the FD), or dup the
inherited TTY FD and keep it around forever.
Wasting one FD seems less annoying than modifying process signals for
all potential library users, so that is the approach taken here.
[costapp]
ERROR! [pid 25135] 21:10:56.284733 E mitogen.ctx.ssh.35.200.203.48: mitogen: While calling no-reply method PushFileService.forward
Traceback (most recent call last):
File "master:/home/dmw/src/mitogen/mitogen/service.py", line 260, in _invoke
ret = method(**kwargs)
File "master:/home/dmw/src/mitogen/mitogen/service.py", line 718, in forward
self._forward(path, context)
File "master:/home/dmw/src/mitogen/mitogen/service.py", line 633, in _forward
stream = self.router.stream_by_id(context.context_id)
AttributeError: 'unicode' object has no attribute 'context_id'
^C [ERROR]: User interrupted execution
Minify-safe files are marked with a magical "# !mitogen: minify_safe"
comment anywhere in the file, which activates the minifier. The result
is naturally cached by ModuleResponder, therefore lru_cache is gone too.
Given:
import os, mitogen
@mitogen.main()
def main(router):
c = router.ssh(hostname='k3')
c.call(os.getpid)
router.sudo(via=c)
SSH footprint drops from 56.2 KiB to 42.75 KiB (-23.9%)
Ansible "shell: hostname" drops 149.26 KiB to 117.42 KiB (-21.3%)
When the interpreter is modern enough, use zlib.compressobj() to
pre-compress the unchanging parts of the bootstrap once, then use
compressobj.copy() to append just the context's config during stream
construction.
Before: 100 loops, best of 3: 5.81 msec per loop
After: 10000 loops, best of 3: 35.9 usec per loop
With 100 targets this is enough to knock 6 seconds off startup, at 500
targets it becomes half a minute.
Test 'program':
python -m timeit -s '
import mitogen.parent as p;
import mitogen.master as m;
r=m.Router();
s=p.Stream(r, 0, max_message_size=1);
r.broker.shutdown()'\
\
's.get_preamble()'
Ansible modules were being resent continuously - but only the main
script module, and any custom modutils if any were present.
Wire footprint drops by ~1/3rd for a 500 task run of 'shell: hostname':
-rw-r--r-- 1 root root 584K Jan 31 22:06 500mito-before2
-rw-r--r-- 1 root root 434K Jan 31 22:04 500mito-filesbugonly
Single task 100 SSH target run, before:
3533181 function calls (3533083 primitive calls) in 616.688 seconds
User time (seconds): 32.52
System time (seconds): 2.71
Percent of CPU this job got: 64%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:54.88
After:
451602 function calls (451504 primitive calls) in 570.746 seconds
User time (seconds): 29.48
System time (seconds): 2.81
Percent of CPU this job got: 67%
Elapsed (wall clock) time (h:mm:ss or m:ss): 0:48.20
Traceback (most recent call last):
File "<stdin>", line 2707, in _invoke
File "<stdin>", line 2480, in _on_del_route
NameError: global name 'target_id' is not defined
os.write() can fail with EINTR due to signals, so wrap it in
io_op(). Closes#483.
Masking EBADF looks like it is/was almost certainly papering over a bug,
remove it and suffer the bug reports. Closes#495.
Fixes:
ERROR! [pid 1096] 23:31:48.363215 E mitogen: _broker_main() crashed
Traceback (most recent call last):
File "/home/dmw/src/mitogen/mitogen/core.py", line 2917, in _broker_main
self._loop_once()
File "/home/dmw/src/mitogen/mitogen/core.py", line 2875, in _loop_once
self._call(side.stream, func)
File "/home/dmw/src/mitogen/mitogen/core.py", line 2860, in _call
stream.on_disconnect(self)
File "/home/dmw/src/mitogen/mitogen/parent.py", line 1161, in on_disconnect
super(Stream, self).on_disconnect(broker)
File "/home/dmw/src/mitogen/mitogen/core.py", line 1534, in on_disconnect
fire(self, 'disconnect')
File "/home/dmw/src/mitogen/mitogen/core.py", line 390, in fire
func(*args, **kwargs)
File "/home/dmw/src/mitogen/mitogen/parent.py", line 1794, in <lambda>
func=lambda: self._on_stream_disconnect(stream),
File "/home/dmw/src/mitogen/mitogen/parent.py", line 1810, in _on_stream_disconnect
routes = self._routes_by_stream.pop(stream)
KeyError: mitogen.ssh.Stream('ssh.localhost:2236')
propagate_up() sends ADD_ROUTE and DEL_ROUTE
propagate_down() sends only DEL_ROUTE, but didn't bother checking if
up() had sent it already.
Fixes:
ERROR! [pid 41060] 17:55:30.739159 E mitogen.ctx.ssh.localhost:
mitogen: RouteMonitor(): received DEL_ROUTE for 6081 from
mitogen.fork.Stream(u'fork.41142'), expected
mitogen.core.Stream('parent')
os._exit() subverted calm shutdown, meaning unix.Listener never had a
chance to cleanup its socket.
Move unix.Listener socket cleanup into its class so it is automatic
during shutdown, rather than cutpasted for each consumer.
Disable the watcher thread in the MuxProcess, it is useless.
Add .sock extension to /tmp/mitogen_unix_*, so we can write a test.