It knows how to dispatch messages from multiple receivers (associated
with multiple services) to multiple threads, where the service
implementation is invoked on the message.
It wakes a maximum of one thread per received message.
It knows how to shut down gracefully.
Implication: due to the latch use, there are 2 file descriptors burned
for every thread. We don't need interruptibility here, so in future, it
might be nice to allow swapping a diferent queueing primitive into
Select (maybe a subclass?) just for this case.
Implication: the entire message remains buffered until its last byte is
transmitted. Not wasting time on it, as there are pieces of work like
issue #6 that might invalidate these problems on the transmit path
entirely.
Rather than slowly build up a Python string over time, we just store a
deque of chunks (which, in a later commit, will now be around 128KB
each), and track the total buffer size in a separate integer.
The tricky loop is there to ensure the header does not need to be sliced
off the full message (which may be huge, causing yet another spike and
copy), but rather only off the much smaller first 128kb-sized chunk
received.
There is one more problem with this code: the ''.join() causes RAM usage
to temporarily double, but that was true of the old solution too. Shall
wait for bug reports before fixing this, as it gets very ugly very fast.
There is no penalty for just passing as much data to the OS as possible,
it is not copied, and for a non-blocking socket, the OS will just keep
buffer as much as it can and tell us how much that was.
Also avoids a rather pointless string slice.
Reduces the number of IO loop iterations required to receive large
messages at a small cost to RAM usage.
Note that when calling read() with a large buffer value like this,
Python must zero-allocate that much RAM. In other words, for even a
single byte received, 128kb of RAM might need to be written.
Consequently CHUNK_SIZE is quite a sensitive value and this might need
further tuning.
accept() (per interface) returns a non-blocking socket because the
listener socket is in non-blocking mode, therefore it is pure scheduling
luck that a connecting-in child has a chance to write anything for the
top-level processs to read during the subsequent .recv().
A higher forks setting in ansible.cfg was enough to cause our luck to
run out, causing the .recv() to crashi with EGAIN, and the multiplexer
to respond to the handler's crash by calling its disconnect method. This
is why some reports mentioned ECONNREFUSED -- the listener really was
gone, because its Stream class had crashed.
Meanwhile since the window where we're waiting for the remote process to
identify itself is tiny, simply flip off O_NONBLOCK for the duration of
the connection handshake. Stream.accept() (via Side.__init__) will
reenable O_NONBLOCK for the descriptors it duplicates, so we don't even
need to bother turning this back off.
A better solution entails splitting Stream up into a state machine and
doing the handshake with non-blocking IO, but that isn't going to be
available until asynchronous connect is implemented. Meanwhile in
reality this solution is probably 100% fine.
There is some insane unidentifiable Mitogen context (the local context?)
that instantly crashes with a higher forks setting. It appears to be
harmless, but meanwhile this naturally shouldn't be happening.
Implement Connection.__del__, which is almost certainly going to trigger
more bugs down the line, because the state of the Connection instance is
not guranteed during __del__. Meanwhile, it is temporarily needed for
deployed-today Ansibles that have a buggy synchronize action that does
not call Connection.close().
A better approach to this would be to virtualize the guts of Connection,
and move its management to one central place where we can guarantee
resource destruction happens reliably, but that may entail another
Ansible monkey-patch to give us such a reliable hook.
When a Broker() is running with install_watcher=True, arrange for only
one watcher thread to exist for each target thread, and to reset the
mapping of watchers to targets after process fork.
This is probably the last change I want to make to the watcher feature
before deciding to rip it out, it may be more trouble than it is worth.
Full output of failed test
```
ERROR: test_okay (__main__.FakeSshTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "tests/ssh_test.py", line 16, in test_okay
ssh_path=testlib.data_path('fakessh.py'),
File "/home/alex/src/mitogen/mitogen/master.py", line 650, in ssh
return self.connect('ssh', **kwargs)
File "/home/alex/src/mitogen/mitogen/parent.py", line 463, in connect
return self._connect(context_id, klass, name=name, **kwargs)
File "/home/alex/src/mitogen/mitogen/parent.py", line 449, in _connect
stream.connect()
File "/home/alex/src/mitogen/mitogen/ssh.py", line 104, in connect
super(Stream, self).connect()
File "/home/alex/src/mitogen/mitogen/parent.py", line 395, in connect
self._connect_bootstrap()
File "/home/alex/src/mitogen/mitogen/ssh.py", line 116, in
_connect_bootstrap
time.time() + 10.0):
File "/home/alex/src/mitogen/mitogen/parent.py", line 207, in
iter_read
(''.join(bits)[-300:],)
mitogen.core.StreamError: EOF on stream; last 300 bytes received:
'Usage: fakessh.py [options]\n\nfakessh.py: error: no such option: -o\n'
```