Merge remote-tracking branch 'origin/dmw'

- move more docs into source code
- make tests detect leaked threads, FDs
- #405, #406, #417
issue260
David Wilson 6 years ago
commit 7141e9c11f

@ -1,4 +1,5 @@
-r docs/docs-requirements.txt -r docs/docs-requirements.txt
psutil==5.4.8
coverage==4.5.1 coverage==4.5.1
Django==1.6.11 # Last version supporting 2.6. Django==1.6.11 # Last version supporting 2.6.
mock==2.0.0 mock==2.0.0

@ -429,7 +429,7 @@ Temporary Files
Temporary file handling in Ansible is tricky, and the precise behaviour varies Temporary file handling in Ansible is tricky, and the precise behaviour varies
across major versions. A variety of temporary files and directories are across major versions. A variety of temporary files and directories are
created, depending on the operating mode: created, depending on the operating mode.
In the best case when pipelining is enabled and no temporary uploads are In the best case when pipelining is enabled and no temporary uploads are
required, for each task Ansible will create one directory below a required, for each task Ansible will create one directory below a

File diff suppressed because it is too large Load Diff

@ -126,7 +126,7 @@ Core Library
v0.2.4 (2018-??-??) v0.2.4 (2018-??-??)
------------------ -------------------
Mitogen for Ansible Mitogen for Ansible
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
@ -137,15 +137,14 @@ Enhancements
* `#76 <https://github.com/dw/mitogen/issues/76>`_, * `#76 <https://github.com/dw/mitogen/issues/76>`_,
`#351 <https://github.com/dw/mitogen/issues/351>`_, `#351 <https://github.com/dw/mitogen/issues/351>`_,
`#352 <https://github.com/dw/mitogen/issues/352>`_: disconnect propagation `#352 <https://github.com/dw/mitogen/issues/352>`_: disconnect propagation
has improved, allowing Ansible to cancel waits for responses from targets has improved, allowing Ansible to cancel waits for responses from abruptly
that where abruptly disconnected. This increases the chance a task will fail disconnected targets. This ensures a task will gracefully fail rather than
gracefully, rather than hanging due to the connection being severed, for hang, for example on network failure or EC2 instance maintenance.
example because of network failure or EC2 instance maintenance.
* `#369 <https://github.com/dw/mitogen/issues/369>`_: :meth:`Connection.reset` * `#369 <https://github.com/dw/mitogen/issues/369>`_: :meth:`Connection.reset`
is implemented, allowing `meta: reset_connection is implemented, allowing `meta: reset_connection
<https://docs.ansible.com/ansible/latest/modules/meta_module.html>`_ to shut <https://docs.ansible.com/ansible/latest/modules/meta_module.html>`_ to shut
down the remote interpreter as expected, and improving support for the down the remote interpreter as documented, and improving support for the
`reboot `reboot
<https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_ <https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_
module. module.
@ -156,26 +155,22 @@ Fixes
* `#323 <https://github.com/dw/mitogen/issues/323>`_, * `#323 <https://github.com/dw/mitogen/issues/323>`_,
`#333 <https://github.com/dw/mitogen/issues/333>`_: work around a Windows `#333 <https://github.com/dw/mitogen/issues/333>`_: work around a Windows
Subsystem for Linux bug that would cause tracebacks to be rendered during Subsystem for Linux bug that caused tracebacks to appear during shutdown.
shutdown.
* `#334 <https://github.com/dw/mitogen/issues/334>`_: the SSH method * `#334 <https://github.com/dw/mitogen/issues/334>`_: the SSH method
tilde-expands private key paths using Ansible's logic. Previously Mitogen tilde-expands private key paths using Ansible's logic. Previously the path
passed the path unmodified to SSH, which would expand it using was passed unmodified to SSH, which expanded it using :func:`os.getpwent`.
:func:`os.getpwent`. This differs from :func:`os.path.expanduser`, which uses the ``HOME``
This differs from :func:`os.path.expanduser`, which prefers the ``HOME``
environment variable if it is set, causing behaviour to diverge when Ansible environment variable if it is set, causing behaviour to diverge when Ansible
was invoked using sudo without appropriate flags to cause the ``HOME`` was invoked across user accounts via ``sudo``.
environment variable to be reset to match the target account.
* `#370 <https://github.com/dw/mitogen/issues/370>`_: the Ansible * `#370 <https://github.com/dw/mitogen/issues/370>`_: the Ansible
`reboot <https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_ `reboot <https://docs.ansible.com/ansible/latest/modules/reboot_module.html>`_
module is supported. module is supported.
* `#373 <https://github.com/dw/mitogen/issues/373>`_: the LXC and LXD methods * `#373 <https://github.com/dw/mitogen/issues/373>`_: the LXC and LXD methods
now print a useful hint when Python fails to start, as no useful error is print a useful hint on failure, as no useful error is normally logged to the
normally logged to the console by these tools. console by these tools.
* `#400 <https://github.com/dw/mitogen/issues/400>`_: work around a threading * `#400 <https://github.com/dw/mitogen/issues/400>`_: work around a threading
bug in the AWX display callback when running with high verbosity setting. bug in the AWX display callback when running with high verbosity setting.
@ -195,21 +190,33 @@ Fixes
Core Library Core Library
~~~~~~~~~~~~ ~~~~~~~~~~~~
* `#76 <https://github.com/dw/mitogen/issues/76>`_: routing maintains the set * `#76 <https://github.com/dw/mitogen/issues/76>`_: routing records the
of destination context ID ever received on each stream, and when destination context IDs ever received on each stream, and when disconnection
disconnection occurs, propagates ``DEL_ROUTE`` messages downwards towards occurs, propagates :data:`mitogen.core.DEL_ROUTE` messages towards every
every stream that ever communicated with a disappearing peer, rather than stream that ever communicated with the disappearing peer, rather than simply
simply toward parents. towards parents.
Conversations between nodes anywhere in the tree receive
:data:`mitogen.core.DEL_ROUTE` when either participant disconnects, allowing
receivers to wake with :class:`mitogen.core.ChannelError`, even when one
participant is not a parent of the other.
* `#405 <https://github.com/dw/mitogen/issues/405>`_: if an oversized message
is rejected, and it has a ``reply_to`` set, a dead message is returned to the
sender. This ensures function calls exceeding the configured maximum size
crash rather than hang.
* `#406 <https://github.com/dw/mitogen/issues/406>`_:
:class:`mitogen.core.Broker` did not call :meth:`mitogen.core.Poller.close`
during shutdown, leaking the underlying poller FD in masters and parents.
Conversations between nodes in any level of the tree receive ``DEL_ROUTE`` * `#406 <https://github.com/dw/mitogen/issues/406>`_: connections could leak
messages when a participant disconnects, allowing receivers to be woken with FDs when a child process failed to start.
:class:`mitogen.core.ChannelError` to signal the connection has broken, even
when one participant is not a parent of the other.
* `#405 <https://github.com/dw/mitogen/issues/405>`_: if a message is rejected * `#406 <https://github.com/dw/mitogen/issues/406>`_,
due to being too large, and it has a ``reply_to`` set, a dead message is `#417 <https://github.com/dw/mitogen/issues/417>`_: connections could leave
returned to the sender. This ensures function calls exceeding the configured FD wrapper objects that had not been closed lying around to be closed during
maximum size crash rather than hang. garbage collection, causing reused FD numbers to be closed at random moments.
* `#411 <https://github.com/dw/mitogen/issues/411>`_: the SSH method typed * `#411 <https://github.com/dw/mitogen/issues/411>`_: the SSH method typed
"``y``" rather than the requisite "``yes``" when `check_host_keys="accept"` "``y``" rather than the requisite "``yes``" when `check_host_keys="accept"`
@ -227,7 +234,7 @@ Thanks!
~~~~~~~ ~~~~~~~
Mitogen would not be possible without the support of users. A huge thanks for Mitogen would not be possible without the support of users. A huge thanks for
bug reports, features and fixes in this release contributed by bug reports, testing, features and fixes in this release contributed by
`Berend De Schouwer <https://github.com/berenddeschouwer>`_, `Berend De Schouwer <https://github.com/berenddeschouwer>`_,
`Brian Candler <https://github.com/candlerb>`_, `Brian Candler <https://github.com/candlerb>`_,
`Duane Zamrok <https://github.com/dewthefifth>`_, `Duane Zamrok <https://github.com/dewthefifth>`_,
@ -441,7 +448,7 @@ Thanks!
~~~~~~~ ~~~~~~~
Mitogen would not be possible without the support of users. A huge thanks for Mitogen would not be possible without the support of users. A huge thanks for
bug reports, features and fixes in this release contributed by bug reports, testing, features and fixes in this release contributed by
`Alex Russu <https://github.com/alexrussu>`_, `Alex Russu <https://github.com/alexrussu>`_,
`Alex Willmer <https://github.com/moreati>`_, `Alex Willmer <https://github.com/moreati>`_,
`atoom <https://github.com/atoom>`_, `atoom <https://github.com/atoom>`_,

@ -16,17 +16,17 @@ The UNIX First Stage
To allow delivery of the bootstrap compressed using :py:mod:`zlib`, it is To allow delivery of the bootstrap compressed using :py:mod:`zlib`, it is
necessary for something on the remote to be prepared to decompress the payload necessary for something on the remote to be prepared to decompress the payload
and feed it to a Python interpreter. Since we would like to avoid writing an and feed it to a Python interpreter [#f1]_. Since we would like to avoid
error-prone shell fragment to implement this, and since we must avoid writing writing an error-prone shell fragment to implement this, and since we must
to the remote machine's disk in case it is read-only, the Python process avoid writing to the remote machine's disk in case it is read-only, the Python
started on the remote machine by Mitogen immediately forks in order to process started on the remote machine by Mitogen immediately forks in order to
implement the decompression. implement the decompression.
Python Command Line Python Command Line
################### ###################
The Python command line sent to the host is a :mod:`zlib`-compressed [#f1]_ and The Python command line sent to the host is a :mod:`zlib`-compressed [#f2]_ and
base64-encoded copy of the :py:meth:`mitogen.master.Stream._first_stage` base64-encoded copy of the :py:meth:`mitogen.master.Stream._first_stage`
function, which has been carefully optimized to reduce its size. Prior to function, which has been carefully optimized to reduce its size. Prior to
compression and encoding, ``CONTEXT_NAME`` is replaced with the desired context compression and encoding, ``CONTEXT_NAME`` is replaced with the desired context
@ -65,10 +65,10 @@ allowing reading by the first stage of exactly the required bytes.
Configuring argv[0] Configuring argv[0]
################### ###################
Forking provides us with an excellent opportunity for tidying up the eventual Forking provides an excellent opportunity to tidy up the eventual Python
Python interpreter, in particular, restarting it using a fresh command-line to interpreter, in particular, restarting it using a fresh command-line to get rid
get rid of the large base64-encoded first stage parameter, and to replace of the large base64-encoded first stage parameter, and to replace **argv[0]**
**argv[0]** with something descriptive. with something descriptive.
After configuring its ``stdin`` to point to the read end of the pipe, the After configuring its ``stdin`` to point to the read end of the pipe, the
parent half of the fork re-executes Python, with **argv[0]** taken from the parent half of the fork re-executes Python, with **argv[0]** taken from the
@ -1018,7 +1018,13 @@ receive items in the order they are requested, as they become available.
.. rubric:: Footnotes .. rubric:: Footnotes
.. [#f1] Compression may seem redundant, however it is basically free and reducing IO .. [#f1] Although some connection methods such as SSH support compression, and
Mitogen enables SSH compression by default, there are circumstances where
disabling SSH compression is desirable, and many scenarios for future
connection methods where transport-layer compression is not supported at
all.
.. [#f2] Compression may seem redundant, however it is basically free and reducing IO
is always a good idea. The 33% / 200 byte saving may mean the presence or is always a good idea. The 33% / 200 byte saving may mean the presence or
absence of an additional frame on the network, or in real world terms after absence of an additional frame on the network, or in real world terms after
accounting for SSH overhead, around a 2% reduced chance of a stall during accounting for SSH overhead, around a 2% reduced chance of a stall during

@ -35,162 +35,33 @@ Side Class
========== ==========
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Side
.. class:: Side (stream, fd, keep_alive=True) :members:
Represent a single side of a :py:class:`BasicStream`. This exists to allow
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
(e.g. UNIX socket) file descriptors to operate identically.
:param mitogen.core.Stream stream:
The stream this side is associated with.
:param int fd:
Underlying file descriptor.
:param bool keep_alive:
Value for :py:attr:`keep_alive`
During construction, the file descriptor has its :py:data:`os.O_NONBLOCK`
flag enabled using :py:func:`fcntl.fcntl`.
.. attribute:: stream
The :py:class:`Stream` for which this is a read or write side.
.. attribute:: fd
Integer file descriptor to perform IO on, or :data:`None` if
:py:meth:`close` has been called.
.. attribute:: keep_alive
If :data:`True`, causes presence of this side in :py:class:`Broker`'s
active reader set to defer shutdown until the side is disconnected.
.. method:: fileno
Return :py:attr:`fd` if it is not :data:`None`, otherwise raise
:py:class:`StreamError`. This method is implemented so that
:py:class:`Side` can be used directly by :py:func:`select.select`.
.. method:: close
Call :py:func:`os.close` on :py:attr:`fd` if it is not :data:`None`,
then set it to :data:`None`.
.. method:: read (n=CHUNK_SIZE)
Read up to `n` bytes from the file descriptor, wrapping the underlying
:py:func:`os.read` call with :py:func:`io_op` to trap common
disconnection conditions.
:py:meth:`read` always behaves as if it is reading from a regular UNIX
file; socket, pipe, and TTY disconnection errors are masked and result
in a 0-sized read just like a regular file.
:returns:
Bytes read, or the empty to string to indicate disconnection was
detected.
.. method:: write (s)
Write as much of the bytes from `s` as possible to the file descriptor,
wrapping the underlying :py:func:`os.write` call with :py:func:`io_op`
to trap common disconnection connditions.
:returns:
Number of bytes written, or :data:`None` if disconnection was
detected.
Stream Classes Stream Classes
============== ==============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: BasicStream
.. class:: BasicStream :members:
.. attribute:: receive_side
A :py:class:`Side` representing the stream's receive file descriptor.
.. attribute:: transmit_side
A :py:class:`Side` representing the stream's transmit file descriptor.
.. method:: on_disconnect (broker)
Called by :py:class:`Broker` to force disconnect the stream. The base
implementation simply closes :py:attr:`receive_side` and
:py:attr:`transmit_side` and unregisters the stream from the broker.
.. method:: on_receive (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`receive_side` has
been marked readable using :py:meth:`Broker.start_receive` and the
broker has detected the associated file descriptor is ready for
reading.
Subclasses must implement this method if
:py:meth:`Broker.start_receive` is ever called on them, and the method
must call :py:meth:`on_disconect` if reading produces an empty string.
.. method:: on_transmit (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side`
has been marked writeable using :py:meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this method if
:py:meth:`Broker._start_transmit` is ever called on them.
.. method:: on_shutdown (broker)
Called by :py:meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:py:meth:`on_disconnect`.
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. method:: pending_bytes ()
Returns the number of bytes queued for transmission on this stream.
This can be used to limit the amount of data buffered in RAM by an
otherwise unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, using a wrapper like:
::
def get_pending_bytes(self, stream):
latch = mitogen.core.Latch()
self.broker.defer(
lambda: latch.put(stream.pending_bytes())
)
return latch.get()
.. currentmodule:: mitogen.fork .. currentmodule:: mitogen.fork
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.ssh .. currentmodule:: mitogen.ssh
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. currentmodule:: mitogen.sudo .. currentmodule:: mitogen.sudo
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
@ -212,6 +83,7 @@ Poller Class
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Poller .. autoclass:: Poller
:members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: KqueuePoller .. autoclass:: KqueuePoller
@ -256,64 +128,16 @@ ExternalContext Class
===================== =====================
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: ExternalContext
:members:
.. class:: ExternalContext
External context implementation.
.. attribute:: broker
The :py:class:`mitogen.core.Broker` instance.
.. attribute:: context
The :py:class:`mitogen.core.Context` instance.
.. attribute:: channel
The :py:class:`mitogen.core.Channel` over which
:py:data:`CALL_FUNCTION` requests are received.
.. attribute:: stdout_log
The :py:class:`mitogen.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :py:class:`mitogen.core.Importer` instance.
.. attribute:: stdout_log
The :py:class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :py:class:`IoLogger` connected to ``stderr``.
.. method:: _dispatch_calls
Implementation for the main thread in every child context.
mitogen.master mitogen.master
============== ==============
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.parent
.. autoclass:: ProcessMonitor
.. class:: ProcessMonitor :members:
Install a :py:data:`signal.SIGCHLD` handler that generates callbacks when a
specific child process has exitted.
.. method:: add (pid, callback)
Add a callback function to be notified of the exit status of a process.
:param int pid:
Process ID to be notified of.
:param callback:
Function invoked as `callback(status)`, where `status` is the raw
exit status of the child process.
Blocking I/O Functions Blocking I/O Functions

@ -100,7 +100,7 @@ CALL_SERVICE = 110
#: * a remote receiver is disconnected or explicitly closed. #: * a remote receiver is disconnected or explicitly closed.
#: * a related message could not be delivered due to no route existing for it. #: * a related message could not be delivered due to no route existing for it.
#: * a router is being torn down, as a sentinel value to notify #: * a router is being torn down, as a sentinel value to notify
#: :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. #: :meth:`mitogen.core.Router.add_handler` callbacks to clean up.
IS_DEAD = 999 IS_DEAD = 999
try: try:
@ -187,7 +187,7 @@ class Error(Exception):
class LatchError(Error): class LatchError(Error):
"""Raised when an attempt is made to use a :py:class:`mitogen.core.Latch` """Raised when an attempt is made to use a :class:`mitogen.core.Latch`
that has been marked closed.""" that has been marked closed."""
pass pass
@ -239,7 +239,7 @@ class Kwargs(dict):
class CallError(Error): class CallError(Error):
"""Serializable :class:`Error` subclass raised when """Serializable :class:`Error` subclass raised when
:py:meth:`Context.call() <mitogen.parent.Context.call>` fails. A copy of :meth:`Context.call() <mitogen.parent.Context.call>` fails. A copy of
the traceback from the external context is appended to the exception the traceback from the external context is appended to the exception
message.""" message."""
def __init__(self, fmt=None, *args): def __init__(self, fmt=None, *args):
@ -872,6 +872,15 @@ class Receiver(object):
class Channel(Sender, Receiver): class Channel(Sender, Receiver):
"""
A channel inherits from :class:`mitogen.core.Sender` and
`mitogen.core.Receiver` to provide bidirectional functionality.
Since all handles aren't known until after both ends are constructed, for
both ends to communicate through a channel, it is necessary for one end to
retrieve the handle allocated to the other and reconfigure its own channel
to match. Currently this is a manual task.
"""
def __init__(self, router, context, dst_handle, handle=None): def __init__(self, router, context, dst_handle, handle=None):
Sender.__init__(self, context, dst_handle) Sender.__init__(self, context, dst_handle)
Receiver.__init__(self, router, handle) Receiver.__init__(self, router, handle)
@ -1160,12 +1169,35 @@ class LogHandler(logging.Handler):
class Side(object): class Side(object):
"""
Represent a single side of a :class:`BasicStream`. This exists to allow
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
(e.g. UNIX socket) file descriptors to operate identically.
:param mitogen.core.Stream stream:
The stream this side is associated with.
:param int fd:
Underlying file descriptor.
:param bool keep_alive:
Value for :attr:`keep_alive`
During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag
enabled using :func:`fcntl.fcntl`.
"""
_fork_refs = weakref.WeakValueDictionary() _fork_refs = weakref.WeakValueDictionary()
def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False): def __init__(self, stream, fd, cloexec=True, keep_alive=True, blocking=False):
#: The :class:`Stream` for which this is a read or write side.
self.stream = stream self.stream = stream
#: Integer file descriptor to perform IO on, or :data:`None` if
#: :meth:`close` has been called.
self.fd = fd self.fd = fd
self.closed = False self.closed = False
#: If :data:`True`, causes presence of this side in
#: :class:`Broker`'s active reader set to defer shutdown until the
#: side is disconnected.
self.keep_alive = keep_alive self.keep_alive = keep_alive
self._fork_refs[id(self)] = self self._fork_refs[id(self)] = self
if cloexec: if cloexec:
@ -1182,12 +1214,29 @@ class Side(object):
side.close() side.close()
def close(self): def close(self):
"""
Call :func:`os.close` on :attr:`fd` if it is not :data:`None`,
then set it to :data:`None`.
"""
if not self.closed: if not self.closed:
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.closed = True self.closed = True
os.close(self.fd) os.close(self.fd)
def read(self, n=CHUNK_SIZE): def read(self, n=CHUNK_SIZE):
"""
Read up to `n` bytes from the file descriptor, wrapping the underlying
:func:`os.read` call with :func:`io_op` to trap common disconnection
conditions.
:meth:`read` always behaves as if it is reading from a regular UNIX
file; socket, pipe, and TTY disconnection errors are masked and result
in a 0-sized read like a regular file.
:returns:
Bytes read, or the empty to string to indicate disconnection was
detected.
"""
if self.closed: if self.closed:
# Refuse to touch the handle after closed, it may have been reused # Refuse to touch the handle after closed, it may have been reused
# by another thread. TODO: synchronize read()/write()/close(). # by another thread. TODO: synchronize read()/write()/close().
@ -1198,6 +1247,15 @@ class Side(object):
return s return s
def write(self, s): def write(self, s):
"""
Write as much of the bytes from `s` as possible to the file descriptor,
wrapping the underlying :func:`os.write` call with :func:`io_op` to
trap common disconnection connditions.
:returns:
Number of bytes written, or :data:`None` if disconnection was
detected.
"""
if self.closed or self.fd is None: if self.closed or self.fd is None:
# Refuse to touch the handle after closed, it may have been reused # Refuse to touch the handle after closed, it may have been reused
# by another thread. # by another thread.
@ -1210,10 +1268,52 @@ class Side(object):
class BasicStream(object): class BasicStream(object):
#: A :class:`Side` representing the stream's receive file descriptor.
receive_side = None receive_side = None
#: A :class:`Side` representing the stream's transmit file descriptor.
transmit_side = None transmit_side = None
def on_receive(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`receive_side` has
been marked readable using :meth:`Broker.start_receive` and the broker
has detected the associated file descriptor is ready for reading.
Subclasses must implement this if :meth:`Broker.start_receive` is ever
called on them, and the method must call :meth:`on_disconect` if
reading produces an empty string.
"""
pass
def on_transmit(self, broker):
"""
Called by :class:`Broker` when the stream's :attr:`transmit_side`
has been marked writeable using :meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this if :meth:`Broker._start_transmit` is
ever called on them.
"""
pass
def on_shutdown(self, broker):
"""
Called by :meth:`Broker.shutdown` to allow the stream time to
gracefully shutdown. The base implementation simply called
:meth:`on_disconnect`.
"""
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
def on_disconnect(self, broker): def on_disconnect(self, broker):
"""
Called by :class:`Broker` to force disconnect the stream. The base
implementation simply closes :attr:`receive_side` and
:attr:`transmit_side` and unregisters the stream from the broker.
"""
LOG.debug('%r.on_disconnect()', self) LOG.debug('%r.on_disconnect()', self)
if self.receive_side: if self.receive_side:
broker.stop_receive(self) broker.stop_receive(self)
@ -1223,19 +1323,14 @@ class BasicStream(object):
self.transmit_side.close() self.transmit_side.close()
fire(self, 'disconnect') fire(self, 'disconnect')
def on_shutdown(self, broker):
_v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker)
class Stream(BasicStream): class Stream(BasicStream):
""" """
:py:class:`BasicStream` subclass implementing mitogen's :ref:`stream :class:`BasicStream` subclass implementing mitogen's :ref:`stream
protocol <stream-protocol>`. protocol <stream-protocol>`.
""" """
#: If not :data:`None`, :py:class:`Router` stamps this into #: If not :data:`None`, :class:`Router` stamps this into
#: :py:attr:`Message.auth_id` of every message received on this stream. #: :attr:`Message.auth_id` of every message received on this stream.
auth_id = None auth_id = None
#: If not :data:`False`, indicates the stream has :attr:`auth_id` set and #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
@ -1272,7 +1367,7 @@ class Stream(BasicStream):
def on_receive(self, broker): def on_receive(self, broker):
"""Handle the next complete message on the stream. Raise """Handle the next complete message on the stream. Raise
:py:class:`StreamError` on failure.""" :class:`StreamError` on failure."""
_vv and IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read() buf = self.receive_side.read()
@ -1329,6 +1424,14 @@ class Stream(BasicStream):
return True return True
def pending_bytes(self): def pending_bytes(self):
"""
Return the number of bytes queued for transmission on this stream. This
can be used to limit the amount of data buffered in RAM by an otherwise
unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, for example by using :meth:`Broker.defer_sync`.
"""
return self._output_buf_len return self._output_buf_len
def on_transmit(self, broker): def on_transmit(self, broker):
@ -1572,15 +1675,15 @@ class Poller(object):
class Latch(object): class Latch(object):
""" """
A latch is a :py:class:`Queue.Queue`-like object that supports mutation and A latch is a :class:`Queue.Queue`-like object that supports mutation and
waiting from multiple threads, however unlike :py:class:`Queue.Queue`, waiting from multiple threads, however unlike :class:`Queue.Queue`,
waiting threads always remain interruptible, so CTRL+C always succeeds, and waiting threads always remain interruptible, so CTRL+C always succeeds, and
waits where a timeout is set experience no wake up latency. These waits where a timeout is set experience no wake up latency. These
properties are not possible in combination using the built-in threading properties are not possible in combination using the built-in threading
primitives available in Python 2.x. primitives available in Python 2.x.
Latches implement queues using the UNIX self-pipe trick, and a per-thread Latches implement queues using the UNIX self-pipe trick, and a per-thread
:py:func:`socket.socketpair` that is lazily created the first time any :func:`socket.socketpair` that is lazily created the first time any
latch attempts to sleep on a thread, and dynamically associated with the latch attempts to sleep on a thread, and dynamically associated with the
waiting Latch only for duration of the wait. waiting Latch only for duration of the wait.
@ -1626,7 +1729,7 @@ class Latch(object):
def close(self): def close(self):
""" """
Mark the latch as closed, and cause every sleeping thread to be woken, Mark the latch as closed, and cause every sleeping thread to be woken,
with :py:class:`mitogen.core.LatchError` raised in each thread. with :class:`mitogen.core.LatchError` raised in each thread.
""" """
self._lock.acquire() self._lock.acquire()
try: try:
@ -1640,17 +1743,17 @@ class Latch(object):
def empty(self): def empty(self):
""" """
Return :py:data:`True` if calling :py:meth:`get` would block. Return :data:`True` if calling :meth:`get` would block.
As with :py:class:`Queue.Queue`, :py:data:`True` may be returned even As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and message may be posted at any moment between :meth:`empty` and
:py:meth:`get`. :meth:`get`.
As with :py:class:`Queue.Queue`, :py:data:`False` may be returned even As with :class:`Queue.Queue`, :data:`False` may be returned even
though a subsequent call to :py:meth:`get` will block, since another though a subsequent call to :meth:`get` will block, since another
waiting thread may be woken at any moment between :py:meth:`empty` and waiting thread may be woken at any moment between :meth:`empty` and
:py:meth:`get`. :meth:`get`.
""" """
return len(self._queue) == 0 return len(self._queue) == 0
@ -1683,14 +1786,14 @@ class Latch(object):
Return the next enqueued object, or sleep waiting for one. Return the next enqueued object, or sleep waiting for one.
:param float timeout: :param float timeout:
If not :py:data:`None`, specifies a timeout in seconds. If not :data:`None`, specifies a timeout in seconds.
:param bool block: :param bool block:
If :py:data:`False`, immediately raise If :data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the latch is empty. :class:`mitogen.core.TimeoutError` if the latch is empty.
:raises mitogen.core.LatchError: :raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid. :meth:`close` has been called, and the object is no longer valid.
:raises mitogen.core.TimeoutError: :raises mitogen.core.TimeoutError:
Timeout was reached. Timeout was reached.
@ -1771,7 +1874,7 @@ class Latch(object):
exists. exists.
:raises mitogen.core.LatchError: :raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the object is no longer valid. :meth:`close` has been called, and the object is no longer valid.
""" """
_vv and IOLOG.debug('%r.put(%r)', self, obj) _vv and IOLOG.debug('%r.put(%r)', self, obj)
self._lock.acquire() self._lock.acquire()
@ -1807,7 +1910,7 @@ class Latch(object):
class Waker(BasicStream): class Waker(BasicStream):
""" """
:py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_. :class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state Used to wake the multiplexer when another thread needs to modify its state
(via a cross-thread function call). (via a cross-thread function call).
@ -1827,8 +1930,8 @@ class Waker(BasicStream):
def __repr__(self): def __repr__(self):
return 'Waker(%r rfd=%r, wfd=%r)' % ( return 'Waker(%r rfd=%r, wfd=%r)' % (
self._broker, self._broker,
self.receive_side.fd, self.receive_side and self.receive_side.fd,
self.transmit_side.fd, self.transmit_side and self.transmit_side.fd,
) )
@property @property
@ -1844,17 +1947,14 @@ class Waker(BasicStream):
def on_receive(self, broker): def on_receive(self, broker):
""" """
Drain the pipe and fire callbacks. Reading multiple bytes is safe since Drain the pipe and fire callbacks. Since :attr:`_deferred` is
new bytes corresponding to future .defer() calls are written only after synchronized, :meth:`defer` and :meth:`on_receive` can conspire to
.defer() takes _lock: either a byte we read corresponds to something ensure only one byte needs to be pending regardless of queue length.
already on the queue by the time we take _lock, or a byte remains
buffered, causing another wake up, because it was written after we
released _lock.
""" """
_vv and IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
self.receive_side.read(128)
self._lock.acquire() self._lock.acquire()
try: try:
self.receive_side.read(1)
deferred = self._deferred deferred = self._deferred
self._deferred = [] self._deferred = []
finally: finally:
@ -1868,6 +1968,18 @@ class Waker(BasicStream):
func, args, kwargs) func, args, kwargs)
self._broker.shutdown() self._broker.shutdown()
def _wake(self):
"""
Wake the multiplexer by writing a byte. If Broker is midway through
teardown, the FD may already be closed, so ignore EBADF.
"""
try:
self.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EBADF:
raise
def defer(self, func, *args, **kwargs): def defer(self, func, *args, **kwargs):
if threading.currentThread().ident == self.broker_ident: if threading.currentThread().ident == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self) _vv and IOLOG.debug('%r.defer() [immediate]', self)
@ -1876,25 +1988,17 @@ class Waker(BasicStream):
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd) _vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
self._lock.acquire() self._lock.acquire()
try: try:
if not self._deferred:
self._wake()
self._deferred.append((func, args, kwargs)) self._deferred.append((func, args, kwargs))
finally: finally:
self._lock.release() self._lock.release()
# Wake the multiplexer by writing a byte. If the broker is in the midst
# of tearing itself down, the waker fd may already have been closed, so
# ignore EBADF here.
try:
self.transmit_side.write(b(' '))
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EBADF:
raise
class IoLogger(BasicStream): class IoLogger(BasicStream):
""" """
:py:class:`BasicStream` subclass that sets up redirection of a standard :class:`BasicStream` subclass that sets up redirection of a standard
UNIX file descriptor back into the Python :py:mod:`logging` package. UNIX file descriptor back into the Python :mod:`logging` package.
""" """
_buf = '' _buf = ''
@ -2126,8 +2230,8 @@ class Router(object):
return handle return handle
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks """Called during :meth:`Broker.shutdown`, informs callbacks registered
registered with :py:meth:`add_handle_cb` the connection is dead.""" with :meth:`add_handle_cb` the connection is dead."""
_v and LOG.debug('%r.on_shutdown(%r)', self, broker) _v and LOG.debug('%r.on_shutdown(%r)', self, broker)
fire(self, 'shutdown') fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
@ -2249,14 +2353,26 @@ class Router(object):
class Broker(object): class Broker(object):
"""
Responsible for handling I/O multiplexing in a private thread.
**Note:** This is the somewhat limited core version of the Broker class
used by child contexts. The master subclass is documented below.
"""
poller_class = Poller poller_class = Poller
_waker = None _waker = None
_thread = None _thread = None
#: Seconds grace to allow :class:`streams <Stream>` to shutdown gracefully
#: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0 shutdown_timeout = 3.0
def __init__(self, poller_class=None): def __init__(self, poller_class=None):
self._alive = True self._alive = True
self._waker = Waker(self) self._waker = Waker(self)
#: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
#: thread, or immediately if the current thread is the broker thread.
#: Safe to call from any thread.
self.defer = self._waker.defer self.defer = self._waker.defer
self.poller = self.poller_class() self.poller = self.poller_class()
self.poller.start_receive( self.poller.start_receive(
@ -2272,6 +2388,12 @@ class Broker(object):
self._waker.broker_ident = self._thread.ident self._waker.broker_ident = self._thread.ident
def start_receive(self, stream): def start_receive(self, stream):
"""
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
ready for reading. Safe to call from any thread. When the associated
file descriptor becomes ready for reading,
:meth:`BasicStream.on_receive` will be called.
"""
_vv and IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
side = stream.receive_side side = stream.receive_side
assert side and side.fd is not None assert side and side.fd is not None
@ -2279,26 +2401,47 @@ class Broker(object):
side.fd, (side, stream.on_receive)) side.fd, (side, stream.on_receive))
def stop_receive(self, stream): def stop_receive(self, stream):
"""
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as not
ready for reading. Safe to call from any thread.
"""
_vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self.poller.stop_receive, stream.receive_side.fd) self.defer(self.poller.stop_receive, stream.receive_side.fd)
def _start_transmit(self, stream): def _start_transmit(self, stream):
"""
Mark the :attr:`transmit_side <Stream.transmit_side>` on `stream` as
ready for writing. Must only be called from the Broker thread. When the
associated file descriptor becomes ready for writing,
:meth:`BasicStream.on_transmit` will be called.
"""
_vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
side = stream.transmit_side side = stream.transmit_side
assert side and side.fd is not None assert side and side.fd is not None
self.poller.start_transmit(side.fd, (side, stream.on_transmit)) self.poller.start_transmit(side.fd, (side, stream.on_transmit))
def _stop_transmit(self, stream): def _stop_transmit(self, stream):
"""
Mark the :attr:`transmit_side <Stream.receive_side>` on `stream` as not
ready for writing.
"""
_vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self.poller.stop_transmit(stream.transmit_side.fd) self.poller.stop_transmit(stream.transmit_side.fd)
def keep_alive(self): def keep_alive(self):
"""
Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute
is :data:`True`, or any :class:`Context` is still registered that is
not the master. Used to delay shutdown while some important work is in
progress (e.g. log draining).
"""
it = (side.keep_alive for (_, (side, _)) in self.poller.readers) it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0) return sum(it, 0)
def defer_sync(self, func): def defer_sync(self, func):
""" """
Block the calling thread while `func` runs on a broker thread. Arrange for `func()` to execute on the broker thread, blocking the
current thread until a result or exception is available.
:returns: :returns:
Return value of `func()`. Return value of `func()`.
@ -2330,40 +2473,61 @@ class Broker(object):
for (side, func) in self.poller.poll(timeout): for (side, func) in self.poller.poll(timeout):
self._call(side.stream, func) self._call(side.stream, func)
def _broker_exit(self):
for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self)
self.poller.close()
def _broker_shutdown(self):
for _, (side, _) in self.poller.readers + self.poller.writers:
self._call(side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline:
self._loop_once(max(0, deadline - time.time()))
if self.keep_alive():
LOG.error('%r: some streams did not close gracefully. '
'The most likely cause for this is one or '
'more child processes still connected to '
'our stdout/stderr pipes.', self)
def _broker_main(self): def _broker_main(self):
"""
Handle events until :meth:`shutdown`. On shutdown, invoke
:meth:`Stream.on_shutdown` for every active stream, then allow up to
:attr:`shutdown_timeout` seconds for the streams to unregister
themselves before forcefully calling :meth:`Stream.on_disconnect`.
"""
try: try:
while self._alive: while self._alive:
self._loop_once() self._loop_once()
fire(self, 'shutdown') fire(self, 'shutdown')
for _, (side, _) in self.poller.readers + self.poller.writers: self._broker_shutdown()
self._call(side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline:
self._loop_once(max(0, deadline - time.time()))
if self.keep_alive():
LOG.error('%r: some streams did not close gracefully. '
'The most likely cause for this is one or '
'more child processes still connected to '
'our stdout/stderr pipes.', self)
for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self)
except Exception: except Exception:
LOG.exception('_broker_main() crashed') LOG.exception('_broker_main() crashed')
self._broker_exit()
fire(self, 'exit') fire(self, 'exit')
def shutdown(self): def shutdown(self):
"""
Request broker gracefully disconnect streams and stop. Safe to call
from any thread.
"""
_v and LOG.debug('%r.shutdown()', self) _v and LOG.debug('%r.shutdown()', self)
def _shutdown(): def _shutdown():
self._alive = False self._alive = False
self.defer(_shutdown) self.defer(_shutdown)
def join(self): def join(self):
"""
Wait for the broker to stop, expected to be called after
:meth:`shutdown`.
"""
self._thread.join() self._thread.join()
def __repr__(self): def __repr__(self):
@ -2435,6 +2599,34 @@ class Dispatcher(object):
class ExternalContext(object): class ExternalContext(object):
"""
External context implementation.
.. attribute:: broker
The :class:`mitogen.core.Broker` instance.
.. attribute:: context
The :class:`mitogen.core.Context` instance.
.. attribute:: channel
The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION`
requests are received.
.. attribute:: stdout_log
The :class:`mitogen.core.IoLogger` connected to ``stdout``.
.. attribute:: importer
The :class:`mitogen.core.Importer` instance.
.. attribute:: stdout_log
The :class:`IoLogger` connected to ``stdout``.
.. attribute:: stderr_log
The :class:`IoLogger` connected to ``stderr``.
.. method:: _dispatch_calls
Implementation for the main thread in every child context.
"""
detached = False detached = False
def __init__(self, config): def __init__(self, config):

@ -45,10 +45,6 @@ class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) create_child = staticmethod(mitogen.parent.hybrid_tty_create_child)
child_is_immediate_subprocess = False child_is_immediate_subprocess = False
#: Once connected, points to the corresponding DiagLogStream, allowing it
#: to be disconnected at the same time this stream is being torn down.
tty_stream = None
username = 'root' username = 'root'
password = None password = None
doas_path = 'doas' doas_path = 'doas'
@ -75,10 +71,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'doas.' + mitogen.core.to_text(self.username) self.name = u'doas.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.doas_path, '-u', self.username, '--'] bits = [self.doas_path, '-u', self.username, '--']
bits = bits + super(Stream, self).get_boot_command() bits = bits + super(Stream, self).get_boot_command()
@ -88,15 +80,13 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'doas password is incorrect' password_incorrect_msg = 'doas password is incorrect'
password_required_msg = 'doas password is required' password_required_msg = 'doas password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self)
password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd], fds=[self.receive_side.fd, self.diag_stream.receive_side.fd],
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )
password_sent = False
for buf in it: for buf in it:
LOG.debug('%r: received %r', self, buf) LOG.debug('%r: received %r', self, buf)
if buf.endswith(self.EC0_MARKER): if buf.endswith(self.EC0_MARKER):
@ -111,7 +101,7 @@ class Stream(mitogen.parent.Stream):
if password_sent: if password_sent:
raise PasswordError(self.password_incorrect_msg) raise PasswordError(self.password_incorrect_msg)
LOG.debug('sending password') LOG.debug('sending password')
self.tty_stream.transmit_side.write( self.diag_stream.transmit_side.write(
mitogen.core.to_text(self.password + '\n').encode('utf-8') mitogen.core.to_text(self.password + '\n').encode('utf-8')
) )
password_sent = True password_sent = True

@ -188,6 +188,6 @@ class Stream(mitogen.parent.Stream):
# Don't trigger atexit handlers, they were copied from the parent. # Don't trigger atexit handlers, they were copied from the parent.
os._exit(0) os._exit(0)
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
# None required. # None required.
pass pass

@ -754,6 +754,26 @@ class ModuleResponder(object):
class Broker(mitogen.core.Broker): class Broker(mitogen.core.Broker):
"""
.. note::
You may construct as many brokers as desired, and use the same broker
for multiple routers, however usually only one broker need exist.
Multiple brokers may be useful when dealing with sets of children with
differing lifetimes. For example, a subscription service where
non-payment results in termination for one customer.
:param bool install_watcher:
If :data:`True`, an additional thread is started to monitor the
lifetime of the main thread, triggering :meth:`shutdown`
automatically in case the user forgets to call it, or their code
crashed.
You should not rely on this functionality in your program, it is only
intended as a fail-safe and to simplify the API for new users. In
particular, alternative Python implementations may not be able to
support watching the main thread.
"""
shutdown_timeout = 5.0 shutdown_timeout = 5.0
_watcher = None _watcher = None
poller_class = mitogen.parent.PREFERRED_POLLER poller_class = mitogen.parent.PREFERRED_POLLER
@ -773,7 +793,31 @@ class Broker(mitogen.core.Broker):
class Router(mitogen.parent.Router): class Router(mitogen.parent.Router):
"""
Extend :class:`mitogen.core.Router` with functionality useful to masters,
and child contexts who later become masters. Currently when this class is
required, the target context's router is upgraded at runtime.
.. note::
You may construct as many routers as desired, and use the same broker
for multiple routers, however usually only one broker and router need
exist. Multiple routers may be useful when dealing with separate trust
domains, for example, manipulating infrastructure belonging to separate
customers or projects.
:param mitogen.master.Broker broker:
Broker to use. If not specified, a private :class:`Broker` is created.
"""
broker_class = Broker broker_class = Broker
#: When :data:`True`, cause the broker thread and any subsequent broker and
#: main threads existing in any child to write
#: ``/tmp/mitogen.stats.<pid>.<thread_name>.log`` containing a
#: :mod:`cProfile` dump on graceful exit. Must be set prior to construction
#: of any :class:`Broker`, e.g. via::
#:
#: mitogen.master.Router.profiling = True
profiling = False profiling = False
def __init__(self, broker=None, max_message_size=None): def __init__(self, broker=None, max_message_size=None):
@ -796,6 +840,10 @@ class Router(mitogen.parent.Router):
) )
def enable_debug(self): def enable_debug(self):
"""
Cause this context and any descendant child contexts to write debug
logs to ``/tmp/mitogen.<pid>.log``.
"""
mitogen.core.enable_debug_logging() mitogen.core.enable_debug_logging()
self.debug = True self.debug = True
@ -830,6 +878,12 @@ class IdAllocator(object):
BLOCK_SIZE = 1000 BLOCK_SIZE = 1000
def allocate(self): def allocate(self):
"""
Arrange for a unique context ID to be allocated and associated with a
route leading to the active context. In masters, the ID is generated
directly, in children it is forwarded to the master via a
:data:`mitogen.core.ALLOCATE_ID` message.
"""
self.lock.acquire() self.lock.acquire()
try: try:
id_ = self.next_id id_ = self.next_id

@ -211,7 +211,7 @@ def create_socketpair():
return parentfp, childfp return parentfp, childfp
def detach_popen(*args, **kwargs): def detach_popen(**kwargs):
""" """
Use :class:`subprocess.Popen` to construct a child process, then hack the Use :class:`subprocess.Popen` to construct a child process, then hack the
Popen so that it forgets the child it created, allowing it to survive a Popen so that it forgets the child it created, allowing it to survive a
@ -223,6 +223,8 @@ def detach_popen(*args, **kwargs):
delivered to this process, causing later 'legitimate' calls to fail with delivered to this process, causing later 'legitimate' calls to fail with
ECHILD. ECHILD.
:param list close_on_error:
Array of integer file descriptors to close on exception.
:returns: :returns:
Process ID of the new child. Process ID of the new child.
""" """
@ -230,7 +232,7 @@ def detach_popen(*args, **kwargs):
# handling, without tying the surrounding code into managing a Popen # handling, without tying the surrounding code into managing a Popen
# object, which isn't possible for at least :mod:`mitogen.fork`. This # object, which isn't possible for at least :mod:`mitogen.fork`. This
# should be replaced by a swappable helper class in a future version. # should be replaced by a swappable helper class in a future version.
proc = subprocess.Popen(*args, **kwargs) proc = subprocess.Popen(**kwargs)
proc._child_created = False proc._child_created = False
return proc.pid return proc.pid
@ -271,14 +273,23 @@ def create_child(args, merge_stdio=False, stderr_pipe=False, preexec_fn=None):
mitogen.core.set_cloexec(stderr_w) mitogen.core.set_cloexec(stderr_w)
extra = {'stderr': stderr_w} extra = {'stderr': stderr_w}
pid = detach_popen( try:
args=args, pid = detach_popen(
stdin=childfp, args=args,
stdout=childfp, stdin=childfp,
close_fds=True, stdout=childfp,
preexec_fn=preexec_fn, close_fds=True,
**extra preexec_fn=preexec_fn,
) **extra
)
except Exception:
childfp.close()
parentfp.close()
if stderr_pipe:
os.close(stderr_r)
os.close(stderr_w)
raise
if stderr_pipe: if stderr_pipe:
os.close(stderr_w) os.close(stderr_w)
childfp.close() childfp.close()
@ -338,14 +349,19 @@ def tty_create_child(args):
disable_echo(master_fd) disable_echo(master_fd)
disable_echo(slave_fd) disable_echo(slave_fd)
pid = detach_popen( try:
args=args, pid = detach_popen(
stdin=slave_fd, args=args,
stdout=slave_fd, stdin=slave_fd,
stderr=slave_fd, stdout=slave_fd,
preexec_fn=_acquire_controlling_tty, stderr=slave_fd,
close_fds=True, preexec_fn=_acquire_controlling_tty,
) close_fds=True,
)
except Exception:
os.close(master_fd)
os.close(slave_fd)
raise
os.close(slave_fd) os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
@ -372,14 +388,22 @@ def hybrid_tty_create_child(args):
mitogen.core.set_block(childfp) mitogen.core.set_block(childfp)
disable_echo(master_fd) disable_echo(master_fd)
disable_echo(slave_fd) disable_echo(slave_fd)
pid = detach_popen(
args=args, try:
stdin=childfp, pid = detach_popen(
stdout=childfp, args=args,
stderr=slave_fd, stdin=childfp,
preexec_fn=_acquire_controlling_tty, stdout=childfp,
close_fds=True, stderr=slave_fd,
) preexec_fn=_acquire_controlling_tty,
close_fds=True,
)
except Exception:
os.close(master_fd)
os.close(slave_fd)
parentfp.close()
childfp.close()
raise
os.close(slave_fd) os.close(slave_fd)
childfp.close() childfp.close()
@ -915,6 +939,33 @@ class Stream(mitogen.core.Stream):
#: ExternalContext.main(). #: ExternalContext.main().
max_message_size = None max_message_size = None
#: If :attr:`create_child` supplied a diag_fd, references the corresponding
#: :class:`DiagLogStream`, allowing it to be disconnected when this stream
#: is disconnected. Set to :data:`None` if no `diag_fd` was present.
diag_stream = None
#: Function with the semantics of :func:`create_child` used to create the
#: child process.
create_child = staticmethod(create_child)
#: Dictionary of extra kwargs passed to :attr:`create_child`.
create_child_args = {}
#: :data:`True` if the remote has indicated that it intends to detach, and
#: should not be killed on disconnect.
detached = False
#: If :data:`True`, indicates the child should not be killed during
#: graceful detachment, as it the actual process implementing the child
#: context. In all other cases, the subprocess is SSH, sudo, or a similar
#: tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True
#: Prefix given to default names generated by :meth:`connect`.
name_prefix = u'local'
_reaped = False
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs) super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core']) self.sent_modules = set(['mitogen', 'mitogen.core'])
@ -952,15 +1003,6 @@ class Stream(mitogen.core.Stream):
) )
) )
#: If :data:`True`, indicates the subprocess managed by us should not be
#: killed during graceful detachment, as it the actual process implementing
#: the child context. In all other cases, the subprocess is SSH, sudo, or a
#: similar tool that should be reminded to quit during disconnection.
child_is_immediate_subprocess = True
detached = False
_reaped = False
def _reap_child(self): def _reap_child(self):
""" """
Reap the child process during disconnection. Reap the child process during disconnection.
@ -1000,8 +1042,10 @@ class Stream(mitogen.core.Stream):
raise raise
def on_disconnect(self, broker): def on_disconnect(self, broker):
self._reap_child()
super(Stream, self).on_disconnect(broker) super(Stream, self).on_disconnect(broker)
if self.diag_stream is not None:
self.diag_stream.on_disconnect(broker)
self._reap_child()
# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter # file descriptor 0 as 100, creates a pipe, then execs a new interpreter
@ -1105,10 +1149,6 @@ class Stream(mitogen.core.Stream):
) )
return zlib.compress(source.encode('utf-8'), 9) return zlib.compress(source.encode('utf-8'), 9)
create_child = staticmethod(create_child)
create_child_args = {}
name_prefix = u'local'
def start_child(self): def start_child(self):
args = self.get_boot_command() args = self.get_boot_command()
try: try:
@ -1130,20 +1170,28 @@ class Stream(mitogen.core.Stream):
def connect(self): def connect(self):
LOG.debug('%r.connect()', self) LOG.debug('%r.connect()', self)
self.pid, fd, extra_fd = self.start_child() self.pid, fd, diag_fd = self.start_child()
self.name = u'%s.%s' % (self.name_prefix, self.pid) self.name = u'%s.%s' % (self.name_prefix, self.pid)
self.receive_side = mitogen.core.Side(self, fd) self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd)) self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r', if diag_fd is not None:
self, self.receive_side.fd) self.diag_stream = DiagLogStream(diag_fd, self)
else:
self.diag_stream = None
LOG.debug('%r.connect(): stdin=%r, stdout=%r, diag=%r',
self, self.receive_side.fd, self.transmit_side.fd,
self.diag_stream and self.diag_stream.receive_side.fd)
try: try:
self._connect_bootstrap(extra_fd) self._connect_bootstrap()
except EofError: except EofError:
self.on_disconnect(self._router.broker)
e = sys.exc_info()[1] e = sys.exc_info()[1]
self._adorn_eof_error(e) self._adorn_eof_error(e)
raise raise
except Exception: except Exception:
self.on_disconnect(self._router.broker)
self._reap_child() self._reap_child()
raise raise
@ -1158,8 +1206,10 @@ class Stream(mitogen.core.Stream):
write_all(self.transmit_side.fd, self.get_preamble()) write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, self.EC1_MARKER, discard_until(self.receive_side.fd, self.EC1_MARKER,
self.connect_deadline) self.connect_deadline)
if self.diag_stream:
self._router.broker.start_receive(self.diag_stream)
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
discard_until(self.receive_side.fd, self.EC0_MARKER, discard_until(self.receive_side.fd, self.EC0_MARKER,
self.connect_deadline) self.connect_deadline)
self._ec0_received() self._ec0_received()
@ -1822,6 +1872,10 @@ class Router(mitogen.core.Router):
class ProcessMonitor(object): class ProcessMonitor(object):
"""
Install a :data:`signal.SIGCHLD` handler that generates callbacks when a
specific child process has exitted. This class is obsolete, do not use.
"""
def __init__(self): def __init__(self):
# pid -> callback() # pid -> callback()
self.callback_by_pid = {} self.callback_by_pid = {}
@ -1835,6 +1889,16 @@ class ProcessMonitor(object):
del self.callback_by_pid[pid] del self.callback_by_pid[pid]
def add(self, pid, callback): def add(self, pid, callback):
"""
Add a callback function to be notified of the exit status of a process.
:param int pid:
Process ID to be notified of.
:param callback:
Function invoked as `callback(status)`, where `status` is the raw
exit status of the child process.
"""
self.callback_by_pid[pid] = callback self.callback_by_pid[pid] = callback
_instance = None _instance = None

@ -127,10 +127,6 @@ class Stream(mitogen.parent.Stream):
#: Number of -v invocations to pass on command line. #: Number of -v invocations to pass on command line.
ssh_debug_level = 0 ssh_debug_level = 0
#: If batch_mode=False, points to the corresponding DiagLogStream, allowing
#: it to be disconnected at the same time this stream is being torn down.
tty_stream = None
#: The path to the SSH binary. #: The path to the SSH binary.
ssh_path = 'ssh' ssh_path = 'ssh'
@ -195,11 +191,6 @@ class Stream(mitogen.parent.Stream):
'stderr_pipe': True, 'stderr_pipe': True,
} }
def on_disconnect(self, broker):
if self.tty_stream is not None:
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
bits = [self.ssh_path] bits = [self.ssh_path]
if self.ssh_debug_level: if self.ssh_debug_level:
@ -265,7 +256,7 @@ class Stream(mitogen.parent.Stream):
def _host_key_prompt(self): def _host_key_prompt(self):
if self.check_host_keys == 'accept': if self.check_host_keys == 'accept':
LOG.debug('%r: accepting host key', self) LOG.debug('%r: accepting host key', self)
self.tty_stream.transmit_side.write(b('yes\n')) self.diag_stream.transmit_side.write(b('yes\n'))
return return
# _host_key_prompt() should never be reached with ignore or enforce # _host_key_prompt() should never be reached with ignore or enforce
@ -273,16 +264,10 @@ class Stream(mitogen.parent.Stream):
# with ours. # with ours.
raise HostKeyError(self.hostkey_config_msg) raise HostKeyError(self.hostkey_config_msg)
def _ec0_received(self): def _connect_bootstrap(self):
if self.tty_stream is not None:
self._router.broker.start_receive(self.tty_stream)
return super(Stream, self)._ec0_received()
def _connect_bootstrap(self, extra_fd):
fds = [self.receive_side.fd] fds = [self.receive_side.fd]
if extra_fd is not None: if self.diag_stream is not None:
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) fds.append(self.diag_stream.receive_side.fd)
fds.append(extra_fd)
it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline) it = mitogen.parent.iter_read(fds=fds, deadline=self.connect_deadline)
@ -311,7 +296,7 @@ class Stream(mitogen.parent.Stream):
if self.password is None: if self.password is None:
raise PasswordError(self.password_required_msg) raise PasswordError(self.password_required_msg)
LOG.debug('%r: sending password', self) LOG.debug('%r: sending password', self)
self.tty_stream.transmit_side.write( self.diag_stream.transmit_side.write(
(self.password + '\n').encode() (self.password + '\n').encode()
) )
password_sent = True password_sent = True

@ -80,9 +80,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'su.' + mitogen.core.to_text(self.username) self.name = u'su.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
argv = mitogen.parent.Argv(super(Stream, self).get_boot_command()) argv = mitogen.parent.Argv(super(Stream, self).get_boot_command())
return [self.su_path, self.username, '-c', str(argv)] return [self.su_path, self.username, '-c', str(argv)]
@ -90,7 +87,7 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'su password is incorrect' password_incorrect_msg = 'su password is incorrect'
password_required_msg = 'su password is required' password_required_msg = 'su password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd], fds=[self.receive_side.fd],

@ -150,10 +150,6 @@ class Stream(mitogen.parent.Stream):
super(Stream, self).connect() super(Stream, self).connect()
self.name = u'sudo.' + mitogen.core.to_text(self.username) self.name = u'sudo.' + mitogen.core.to_text(self.username)
def on_disconnect(self, broker):
self.tty_stream.on_disconnect(broker)
super(Stream, self).on_disconnect(broker)
def get_boot_command(self): def get_boot_command(self):
# Note: sudo did not introduce long-format option processing until July # Note: sudo did not introduce long-format option processing until July
# 2013, so even though we parse long-format options, supply short-form # 2013, so even though we parse long-format options, supply short-form
@ -177,12 +173,14 @@ class Stream(mitogen.parent.Stream):
password_incorrect_msg = 'sudo password is incorrect' password_incorrect_msg = 'sudo password is incorrect'
password_required_msg = 'sudo password is required' password_required_msg = 'sudo password is required'
def _connect_bootstrap(self, extra_fd): def _connect_bootstrap(self):
self.tty_stream = mitogen.parent.DiagLogStream(extra_fd, self) fds = [self.receive_side.fd]
if self.diag_stream is not None:
fds.append(self.diag_stream.receive_side.fd)
password_sent = False password_sent = False
it = mitogen.parent.iter_read( it = mitogen.parent.iter_read(
fds=[self.receive_side.fd, extra_fd], fds=fds,
deadline=self.connect_deadline, deadline=self.connect_deadline,
) )

@ -49,10 +49,13 @@ from mitogen.core import LOG
def is_path_dead(path): def is_path_dead(path):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: try:
s.connect(path) try:
except socket.error: s.connect(path)
e = sys.exc_info()[1] except socket.error:
return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT) e = sys.exc_info()[1]
return e.args[0] in (errno.ECONNREFUSED, errno.ENOENT)
finally:
s.close()
return False return False
@ -78,6 +81,11 @@ class Listener(mitogen.core.BasicStream):
self.receive_side = mitogen.core.Side(self, self._sock.fileno()) self.receive_side = mitogen.core.Side(self, self._sock.fileno())
router.broker.start_receive(self) router.broker.start_receive(self)
def on_shutdown(self, broker):
broker.stop_receive(self)
self._sock.close()
self.receive_side.closed = True
def _accept_client(self, sock): def _accept_client(self, sock):
sock.setblocking(True) sock.setblocking(True)
try: try:

@ -24,7 +24,7 @@
that: that:
- async_out.changed == True - async_out.changed == True
- async_out.cmd == "echo hi" - async_out.cmd == "echo hi"
- 'async_out.delta.startswith("0:00:00")' - 'async_out.delta.startswith("0:00:")'
- async_out.end.startswith("20") - async_out.end.startswith("20")
- async_out.invocation.module_args._raw_params == "echo hi" - async_out.invocation.module_args._raw_params == "echo hi"
- async_out.invocation.module_args._uses_shell == True - async_out.invocation.module_args._uses_shell == True

@ -1,6 +1,7 @@
import threading import threading
import mock
import unittest2 import unittest2
import testlib import testlib
@ -8,6 +9,19 @@ import testlib
import mitogen.core import mitogen.core
class ShutdownTest(testlib.TestCase):
klass = mitogen.core.Broker
def test_poller_closed(self):
broker = self.klass()
actual_close = broker.poller.close
broker.poller.close = mock.Mock()
broker.shutdown()
broker.join()
self.assertEquals(1, len(broker.poller.close.mock_calls))
actual_close()
class DeferSyncTest(testlib.TestCase): class DeferSyncTest(testlib.TestCase):
klass = mitogen.core.Broker klass = mitogen.core.Broker
@ -18,6 +32,7 @@ class DeferSyncTest(testlib.TestCase):
self.assertEquals(th, broker._thread) self.assertEquals(th, broker._thread)
finally: finally:
broker.shutdown() broker.shutdown()
broker.join()
def test_exception(self): def test_exception(self):
broker = self.klass() broker = self.klass()
@ -26,6 +41,7 @@ class DeferSyncTest(testlib.TestCase):
broker.defer_sync, lambda: int('dave')) broker.defer_sync, lambda: int('dave'))
finally: finally:
broker.shutdown() broker.shutdown()
broker.join()
if __name__ == '__main__': if __name__ == '__main__':

@ -10,7 +10,7 @@ import testlib
import plain_old_module import plain_old_module
class ConstructorTest(unittest2.TestCase): class ConstructorTest(testlib.TestCase):
klass = mitogen.core.CallError klass = mitogen.core.CallError
def test_string_noargs(self): def test_string_noargs(self):
@ -44,7 +44,7 @@ class ConstructorTest(unittest2.TestCase):
self.assertTrue('test_from_exc_tb' in e.args[0]) self.assertTrue('test_from_exc_tb' in e.args[0])
class PickleTest(unittest2.TestCase): class PickleTest(testlib.TestCase):
klass = mitogen.core.CallError klass = mitogen.core.CallError
def test_string_noargs(self): def test_string_noargs(self):

@ -2,8 +2,13 @@
import json import json
import os import os
import subprocess
import sys import sys
os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_DOAS'] = '1' os.environ['THIS_IS_STUB_DOAS'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])
# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])

@ -2,8 +2,13 @@
import json import json
import os import os
import subprocess
import sys import sys
os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv) os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_SUDO'] = '1' os.environ['THIS_IS_STUB_SUDO'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])
# This must be a child process and not exec() since Mitogen replaces its stderr
# descriptor, causing the last user of the slave PTY to close it, resulting in
# the master side indicating EIO.
subprocess.check_call(sys.argv[sys.argv.index('--') + 1:])

@ -7,7 +7,7 @@ import unittest2
import testlib import testlib
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self): def test_okay(self):
docker_path = testlib.data_path('stubs/stub-docker.py') docker_path = testlib.data_path('stubs/stub-docker.py')
context = self.router.docker( context = self.router.docker(

@ -10,7 +10,7 @@ import mitogen.fakessh
import testlib import testlib
class RsyncTest(testlib.DockerMixin, unittest2.TestCase): class RsyncTest(testlib.DockerMixin, testlib.TestCase):
@timeoutcontext.timeout(5) @timeoutcontext.timeout(5)
@unittest2.skip('broken') @unittest2.skip('broken')
def test_rsync_from_master(self): def test_rsync_from_master(self):

@ -1,4 +1,5 @@
import _ssl
import ctypes import ctypes
import os import os
import random import random
@ -13,21 +14,29 @@ import testlib
import plain_old_module import plain_old_module
IS_64BIT = struct.calcsize('P') == 8 def _find_ssl_linux():
PLATFORM_TO_PATH = { s = testlib.subprocess__check_output(['ldd', _ssl.__file__])
('darwin', False): '/usr/lib/libssl.dylib', for line in s.splitlines():
('darwin', True): '/usr/lib/libssl.dylib', bits = line.split()
('linux2', False): '/usr/lib/libssl.so', if bits[0].startswith('libssl'):
('linux2', True): '/usr/lib/x86_64-linux-gnu/libssl.so', return bits[2]
# Python 2.6
('linux3', False): '/usr/lib/libssl.so', def _find_ssl_darwin():
('linux3', True): '/usr/lib/x86_64-linux-gnu/libssl.so', s = testlib.subprocess__check_output(['otool', '-l', _ssl.__file__])
# Python 3 for line in s.splitlines():
('linux', False): '/usr/lib/libssl.so', bits = line.split()
('linux', True): '/usr/lib/x86_64-linux-gnu/libssl.so', if bits[0] == 'name' and 'libssl' in bits[1]:
} return bits[1]
c_ssl = ctypes.CDLL(PLATFORM_TO_PATH[sys.platform, IS_64BIT])
if sys.platform.startswith('linux'):
LIBSSL_PATH = _find_ssl_linux()
elif sys.platform == 'darwin':
LIBSSL_PATH = _find_ssl_darwin()
else:
assert 0, "Don't know how to find libssl on this platform"
c_ssl = ctypes.CDLL(LIBSSL_PATH)
c_ssl.RAND_pseudo_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int] c_ssl.RAND_pseudo_bytes.argtypes = [ctypes.c_char_p, ctypes.c_int]
c_ssl.RAND_pseudo_bytes.restype = ctypes.c_int c_ssl.RAND_pseudo_bytes.restype = ctypes.c_int
@ -55,7 +64,7 @@ def exercise_importer(n):
return simple_pkg.a.subtract_one_add_two(n) return simple_pkg.a.subtract_one_add_two(n)
class ForkTest(testlib.RouterMixin, unittest2.TestCase): class ForkTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self): def test_okay(self):
context = self.router.fork() context = self.router.fork()
self.assertNotEqual(context.call(os.getpid), os.getpid()) self.assertNotEqual(context.call(os.getpid), os.getpid())
@ -84,7 +93,8 @@ class ForkTest(testlib.RouterMixin, unittest2.TestCase):
context = self.router.fork(on_start=on_start) context = self.router.fork(on_start=on_start)
self.assertEquals(123, recv.get().unpickle()) self.assertEquals(123, recv.get().unpickle())
class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase):
class DoubleChildTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self): def test_okay(self):
# When forking from the master process, Mitogen had nothing to do with # When forking from the master process, Mitogen had nothing to do with
# setting up stdio -- that was inherited wherever the Master is running # setting up stdio -- that was inherited wherever the Master is running

@ -20,7 +20,7 @@ def get_os_environ():
return dict(os.environ) return dict(os.environ)
class LocalTest(testlib.RouterMixin, unittest2.TestCase): class LocalTest(testlib.RouterMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_stream_name(self): def test_stream_name(self):
@ -29,7 +29,7 @@ class LocalTest(testlib.RouterMixin, unittest2.TestCase):
self.assertEquals('local.%d' % (pid,), context.name) self.assertEquals('local.%d' % (pid,), context.name)
class PythonPathTest(testlib.RouterMixin, unittest2.TestCase): class PythonPathTest(testlib.RouterMixin, testlib.TestCase):
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream
def test_inherited(self): def test_inherited(self):

@ -6,7 +6,7 @@ import testlib
import mitogen.master import mitogen.master
class ScanCodeImportsTest(unittest2.TestCase): class ScanCodeImportsTest(testlib.TestCase):
func = staticmethod(mitogen.master.scan_code_imports) func = staticmethod(mitogen.master.scan_code_imports)
if mitogen.core.PY3: if mitogen.core.PY3:

@ -16,7 +16,7 @@ def read_sample(fname):
return sample return sample
class MinimizeSourceTest(unittest2.TestCase): class MinimizeSourceTest(testlib.TestCase):
func = staticmethod(mitogen.minify.minimize_source) func = staticmethod(mitogen.minify.minimize_source)
def test_class(self): def test_class(self):
@ -55,7 +55,7 @@ class MinimizeSourceTest(unittest2.TestCase):
self.assertEqual(expected, self.func(original)) self.assertEqual(expected, self.func(original))
class MitogenCoreTest(unittest2.TestCase): class MitogenCoreTest(testlib.TestCase):
# Verify minimize_source() succeeds for all built-in modules. # Verify minimize_source() succeeds for all built-in modules.
func = staticmethod(mitogen.minify.minimize_source) func = staticmethod(mitogen.minify.minimize_source)
@ -95,7 +95,11 @@ class MitogenCoreTest(unittest2.TestCase):
def test_minify_all(self): def test_minify_all(self):
for name in glob.glob('mitogen/*.py'): for name in glob.glob('mitogen/*.py'):
original = self.read_source(name) original = self.read_source(name)
minified = self.func(original) try:
minified = self.func(original)
except Exception:
print('file was: ' + name)
raise
self._test_syntax_valid(minified, name) self._test_syntax_valid(minified, name)
self._test_line_counts_match(original, minified) self._test_line_counts_match(original, minified)

@ -153,7 +153,7 @@ class StreamErrorTest(testlib.RouterMixin, testlib.TestCase):
self.assertTrue(s in e.args[0]) self.assertTrue(s in e.args[0])
class ContextTest(testlib.RouterMixin, unittest2.TestCase): class ContextTest(testlib.RouterMixin, testlib.TestCase):
def test_context_shutdown(self): def test_context_shutdown(self):
local = self.router.local() local = self.router.local()
pid = local.call(os.getpid) pid = local.call(os.getpid)
@ -181,7 +181,7 @@ class OpenPtyTest(testlib.TestCase):
self.assertEquals(e.args[0], msg) self.assertEquals(e.args[0], msg)
class TtyCreateChildTest(unittest2.TestCase): class TtyCreateChildTest(testlib.TestCase):
func = staticmethod(mitogen.parent.tty_create_child) func = staticmethod(mitogen.parent.tty_create_child)
def test_dev_tty_open_succeeds(self): def test_dev_tty_open_succeeds(self):
@ -207,11 +207,12 @@ class TtyCreateChildTest(unittest2.TestCase):
self.assertEquals(pid, waited_pid) self.assertEquals(pid, waited_pid)
self.assertEquals(0, status) self.assertEquals(0, status)
self.assertEquals(mitogen.core.b(''), tf.read()) self.assertEquals(mitogen.core.b(''), tf.read())
os.close(fd)
finally: finally:
tf.close() tf.close()
class IterReadTest(unittest2.TestCase): class IterReadTest(testlib.TestCase):
func = staticmethod(mitogen.parent.iter_read) func = staticmethod(mitogen.parent.iter_read)
def make_proc(self): def make_proc(self):
@ -230,6 +231,7 @@ class IterReadTest(unittest2.TestCase):
break break
finally: finally:
proc.terminate() proc.terminate()
proc.stdout.close()
def test_deadline_exceeded_before_call(self): def test_deadline_exceeded_before_call(self):
proc = self.make_proc() proc = self.make_proc()
@ -244,6 +246,7 @@ class IterReadTest(unittest2.TestCase):
self.assertEqual(len(got), 0) self.assertEqual(len(got), 0)
finally: finally:
proc.terminate() proc.terminate()
proc.stdout.close()
def test_deadline_exceeded_during_call(self): def test_deadline_exceeded_during_call(self):
proc = self.make_proc() proc = self.make_proc()
@ -261,9 +264,10 @@ class IterReadTest(unittest2.TestCase):
self.assertLess(len(got), 5) self.assertLess(len(got), 5)
finally: finally:
proc.terminate() proc.terminate()
proc.stdout.close()
class WriteAllTest(unittest2.TestCase): class WriteAllTest(testlib.TestCase):
func = staticmethod(mitogen.parent.write_all) func = staticmethod(mitogen.parent.write_all)
def make_proc(self): def make_proc(self):
@ -280,6 +284,7 @@ class WriteAllTest(unittest2.TestCase):
self.func(proc.stdin.fileno(), self.ten_ms_chunk) self.func(proc.stdin.fileno(), self.ten_ms_chunk)
finally: finally:
proc.terminate() proc.terminate()
proc.stdin.close()
def test_deadline_exceeded_before_call(self): def test_deadline_exceeded_before_call(self):
proc = self.make_proc() proc = self.make_proc()
@ -289,6 +294,7 @@ class WriteAllTest(unittest2.TestCase):
)) ))
finally: finally:
proc.terminate() proc.terminate()
proc.stdin.close()
def test_deadline_exceeded_during_call(self): def test_deadline_exceeded_during_call(self):
proc = self.make_proc() proc = self.make_proc()
@ -301,6 +307,7 @@ class WriteAllTest(unittest2.TestCase):
)) ))
finally: finally:
proc.terminate() proc.terminate()
proc.stdin.close()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase): class DisconnectTest(testlib.RouterMixin, testlib.TestCase):

@ -13,7 +13,7 @@ import plain_old_module
import simple_pkg.a import simple_pkg.a
class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase): class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.ModuleResponder klass = mitogen.master.ModuleResponder
def call(self, *args, **kwargs): def call(self, *args, **kwargs):
@ -67,7 +67,7 @@ class NeutralizeMainTest(testlib.RouterMixin, unittest2.TestCase):
class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase): class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
def test_plain_old_module(self): def test_plain_old_module(self):
# The simplest case: a top-level module with no interesting imports or # The simplest case: a top-level module with no interesting imports or
# package machinery damage. # package machinery damage.
@ -89,7 +89,7 @@ class GoodModulesTest(testlib.RouterMixin, unittest2.TestCase):
self.assertEquals(output, "['__main__', 50]\n") self.assertEquals(output, "['__main__', 50]\n")
class BrokenModulesTest(unittest2.TestCase): class BrokenModulesTest(testlib.TestCase):
def test_obviously_missing(self): def test_obviously_missing(self):
# Ensure we don't crash in the case of a module legitimately being # Ensure we don't crash in the case of a module legitimately being
# unavailable. Should never happen in the real world. # unavailable. Should never happen in the real world.
@ -144,7 +144,7 @@ class BrokenModulesTest(unittest2.TestCase):
self.assertIsInstance(msg.unpickle(), tuple) self.assertIsInstance(msg.unpickle(), tuple)
class BlacklistTest(unittest2.TestCase): class BlacklistTest(testlib.TestCase):
@unittest2.skip('implement me') @unittest2.skip('implement me')
def test_whitelist_no_blacklist(self): def test_whitelist_no_blacklist(self):
assert 0 assert 0

@ -36,7 +36,7 @@ def send_n_sized_reply(sender, n):
return 123 return 123
class SourceVerifyTest(testlib.RouterMixin, unittest2.TestCase): class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase):
def setUp(self): def setUp(self):
super(SourceVerifyTest, self).setUp() super(SourceVerifyTest, self).setUp()
# Create some children, ping them, and store what their messages look # Create some children, ping them, and store what their messages look
@ -149,7 +149,7 @@ class PolicyTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(e.args[0], self.router.refused_msg) self.assertEquals(e.args[0], self.router.refused_msg)
class CrashTest(testlib.BrokerMixin, unittest2.TestCase): class CrashTest(testlib.BrokerMixin, testlib.TestCase):
# This is testing both Broker's ability to crash nicely, and Router's # This is testing both Broker's ability to crash nicely, and Router's
# ability to respond to the crash event. # ability to respond to the crash event.
klass = mitogen.master.Router klass = mitogen.master.Router
@ -178,8 +178,7 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase):
self.assertTrue(expect in log.stop()) self.assertTrue(expect in log.stop())
class AddHandlerTest(testlib.TestCase):
class AddHandlerTest(unittest2.TestCase):
klass = mitogen.master.Router klass = mitogen.master.Router
def test_invoked_at_shutdown(self): def test_invoked_at_shutdown(self):

@ -20,7 +20,7 @@ def roundtrip(v):
return mitogen.core.Message(data=msg.data).unpickle() return mitogen.core.Message(data=msg.data).unpickle()
class BlobTest(unittest2.TestCase): class BlobTest(testlib.TestCase):
klass = mitogen.core.Blob klass = mitogen.core.Blob
# Python 3 pickle protocol 2 does weird stuff depending on whether an empty # Python 3 pickle protocol 2 does weird stuff depending on whether an empty
@ -36,7 +36,7 @@ class BlobTest(unittest2.TestCase):
self.assertEquals(b(''), roundtrip(v)) self.assertEquals(b(''), roundtrip(v))
class ContextTest(testlib.RouterMixin, unittest2.TestCase): class ContextTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.core.Context klass = mitogen.core.Context
# Ensure Context can be round-tripped by regular pickle in addition to # Ensure Context can be round-tripped by regular pickle in addition to

@ -29,7 +29,7 @@ class StubSshMixin(testlib.RouterMixin):
del os.environ['STUBSSH_MODE'] del os.environ['STUBSSH_MODE']
class ConstructorTest(testlib.RouterMixin, unittest2.TestCase): class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_okay(self): def test_okay(self):
context = self.router.ssh( context = self.router.ssh(
hostname='hostname', hostname='hostname',
@ -165,7 +165,7 @@ class SshTest(testlib.DockerMixin, testlib.TestCase):
fp.close() fp.close()
class BannerTest(testlib.DockerMixin, unittest2.TestCase): class BannerTest(testlib.DockerMixin, testlib.TestCase):
# Verify the ability to disambiguate random spam appearing in the SSHd's # Verify the ability to disambiguate random spam appearing in the SSHd's
# login banner from a legitimate password prompt. # login banner from a legitimate password prompt.
stream_class = mitogen.ssh.Stream stream_class = mitogen.ssh.Stream

@ -6,11 +6,14 @@ import re
import socket import socket
import subprocess import subprocess
import sys import sys
import threading
import time import time
import traceback
import unittest2 import unittest2
import mitogen.core import mitogen.core
import mitogen.fork
import mitogen.master import mitogen.master
import mitogen.utils import mitogen.utils
@ -41,6 +44,14 @@ if faulthandler is not None:
faulthandler.enable() faulthandler.enable()
def get_fd_count():
"""
Return the number of FDs open by this process.
"""
import psutil
return psutil.Process().num_fds()
def data_path(suffix): def data_path(suffix):
path = os.path.join(DATA_DIR, suffix) path = os.path.join(DATA_DIR, suffix)
if path.endswith('.key'): if path.endswith('.key'):
@ -166,6 +177,53 @@ def sync_with_broker(broker, timeout=10.0):
sem.get(timeout=10.0) sem.get(timeout=10.0)
def log_fd_calls():
mypid = os.getpid()
l = threading.Lock()
real_pipe = os.pipe
def pipe():
with l:
rv = real_pipe()
if mypid == os.getpid():
sys.stdout.write('\n%s\n' % (rv,))
traceback.print_stack(limit=3)
sys.stdout.write('\n')
return rv
os.pipe = pipe
real_socketpair = socket.socketpair
def socketpair(*args):
with l:
rv = real_socketpair(*args)
if mypid == os.getpid():
sys.stdout.write('\n%s -> %s\n' % (args, rv))
traceback.print_stack(limit=3)
sys.stdout.write('\n')
return rv
socket.socketpair = socketpair
real_dup2 = os.dup2
def dup2(*args):
with l:
real_dup2(*args)
if mypid == os.getpid():
sys.stdout.write('\n%s\n' % (args,))
traceback.print_stack(limit=3)
sys.stdout.write('\n')
os.dup2 = dup2
real_dup = os.dup
def dup(*args):
with l:
rv = real_dup(*args)
if mypid == os.getpid():
sys.stdout.write('\n%s -> %s\n' % (args, rv))
traceback.print_stack(limit=3)
sys.stdout.write('\n')
return rv
os.dup = dup
class CaptureStreamHandler(logging.StreamHandler): class CaptureStreamHandler(logging.StreamHandler):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
logging.StreamHandler.__init__(self, *args, **kwargs) logging.StreamHandler.__init__(self, *args, **kwargs)
@ -211,6 +269,46 @@ class LogCapturer(object):
class TestCase(unittest2.TestCase): class TestCase(unittest2.TestCase):
@classmethod
def setUpClass(cls):
# This is done in setUpClass() so we have a chance to run before any
# Broker() instantiations in setUp() etc.
mitogen.fork.on_fork()
cls._fd_count_before = get_fd_count()
super(TestCase, cls).setUpClass()
ALLOWED_THREADS = set([
'MainThread',
'mitogen.master.join_thread_async'
])
@classmethod
def _teardown_check_threads(cls):
counts = {}
for thread in threading.enumerate():
assert thread.name in cls.ALLOWED_THREADS, \
'Found thread %r still running after tests.' % (thread.name,)
counts[thread.name] = counts.get(thread.name, 0) + 1
for name in counts:
assert counts[name] == 1, \
'Found %d copies of thread %r running after tests.' % (name,)
@classmethod
def _teardown_check_fds(cls):
mitogen.core.Latch._on_fork()
if get_fd_count() != cls._fd_count_before:
import os; os.system('lsof -p %s' % (os.getpid(),))
assert 0, "%s leaked FDs. Count before: %s, after: %s" % (
cls, cls._fd_count_before, get_fd_count(),
)
@classmethod
def tearDownClass(cls):
super(TestCase, cls).tearDownClass()
cls._teardown_check_threads()
cls._teardown_check_fds()
def assertRaises(self, exc, func, *args, **kwargs): def assertRaises(self, exc, func, *args, **kwargs):
"""Like regular assertRaises, except return the exception that was """Like regular assertRaises, except return the exception that was
raised. Can't use context manager because tests must run on Python2.4""" raised. Can't use context manager because tests must run on Python2.4"""

@ -11,8 +11,10 @@ import unittest2
import mitogen.core import mitogen.core
from mitogen.core import b from mitogen.core import b
import testlib
class BlobTest(unittest2.TestCase):
class BlobTest(testlib.TestCase):
klass = mitogen.core.Blob klass = mitogen.core.Blob
def make(self): def make(self):
@ -43,7 +45,7 @@ class BlobTest(unittest2.TestCase):
mitogen.core.BytesType(blob2)) mitogen.core.BytesType(blob2))
class SecretTest(unittest2.TestCase): class SecretTest(testlib.TestCase):
klass = mitogen.core.Secret klass = mitogen.core.Secret
def make(self): def make(self):

@ -30,7 +30,7 @@ class MyService(mitogen.service.Service):
} }
class IsPathDeadTest(unittest2.TestCase): class IsPathDeadTest(testlib.TestCase):
func = staticmethod(mitogen.unix.is_path_dead) func = staticmethod(mitogen.unix.is_path_dead)
path = '/tmp/stale-socket' path = '/tmp/stale-socket'
@ -57,7 +57,7 @@ class IsPathDeadTest(unittest2.TestCase):
os.unlink(self.path) os.unlink(self.path)
class ListenerTest(testlib.RouterMixin, unittest2.TestCase): class ListenerTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.unix.Listener klass = mitogen.unix.Listener
def test_constructor_basic(self): def test_constructor_basic(self):
@ -66,7 +66,7 @@ class ListenerTest(testlib.RouterMixin, unittest2.TestCase):
os.unlink(listener.path) os.unlink(listener.path)
class ClientTest(unittest2.TestCase): class ClientTest(testlib.TestCase):
klass = mitogen.unix.Listener klass = mitogen.unix.Listener
def _try_connect(self, path): def _try_connect(self, path):
@ -87,6 +87,8 @@ class ClientTest(unittest2.TestCase):
resp = context.call_service(service_name=MyService, method_name='ping') resp = context.call_service(service_name=MyService, method_name='ping')
self.assertEquals(mitogen.context_id, resp['src_id']) self.assertEquals(mitogen.context_id, resp['src_id'])
self.assertEquals(0, resp['auth_id']) self.assertEquals(0, resp['auth_id'])
router.broker.shutdown()
router.broker.join()
def _test_simple_server(self, path): def _test_simple_server(self, path):
router = mitogen.master.Router() router = mitogen.master.Router()
@ -102,7 +104,9 @@ class ClientTest(unittest2.TestCase):
time.sleep(0.1) time.sleep(0.1)
finally: finally:
pool.shutdown() pool.shutdown()
pool.join()
router.broker.shutdown() router.broker.shutdown()
router.broker.join()
finally: finally:
os._exit(0) os._exit(0)

@ -6,6 +6,8 @@ import mitogen.core
import mitogen.master import mitogen.master
import mitogen.utils import mitogen.utils
import testlib
def func0(router): def func0(router):
return router return router
@ -16,7 +18,7 @@ def func(router):
return router return router
class RunWithRouterTest(unittest2.TestCase): class RunWithRouterTest(testlib.TestCase):
# test_shutdown_on_exception # test_shutdown_on_exception
# test_shutdown_on_success # test_shutdown_on_success
@ -26,7 +28,7 @@ class RunWithRouterTest(unittest2.TestCase):
self.assertFalse(router.broker._thread.isAlive()) self.assertFalse(router.broker._thread.isAlive())
class WithRouterTest(unittest2.TestCase): class WithRouterTest(testlib.TestCase):
def test_with_broker(self): def test_with_broker(self):
router = func() router = func()
self.assertIsInstance(router, mitogen.master.Router) self.assertIsInstance(router, mitogen.master.Router)
@ -40,7 +42,7 @@ class Unicode(mitogen.core.UnicodeType): pass
class Bytes(mitogen.core.BytesType): pass class Bytes(mitogen.core.BytesType): pass
class CastTest(unittest2.TestCase): class CastTest(testlib.TestCase):
def test_dict(self): def test_dict(self):
self.assertEqual(type(mitogen.utils.cast({})), dict) self.assertEqual(type(mitogen.utils.cast({})), dict)
self.assertEqual(type(mitogen.utils.cast(Dict())), dict) self.assertEqual(type(mitogen.utils.cast(Dict())), dict)

Loading…
Cancel
Save