|
|
|
@ -1342,7 +1342,7 @@ class Importer(object):
|
|
|
|
|
|
|
|
|
|
tup = msg.unpickle()
|
|
|
|
|
fullname = tup[0]
|
|
|
|
|
_v and LOG.debug('Importer._on_load_module(%r)', fullname)
|
|
|
|
|
_v and LOG.debug('importer: received %s', fullname)
|
|
|
|
|
|
|
|
|
|
self._lock.acquire()
|
|
|
|
|
try:
|
|
|
|
@ -1382,7 +1382,7 @@ class Importer(object):
|
|
|
|
|
|
|
|
|
|
def load_module(self, fullname):
|
|
|
|
|
fullname = to_text(fullname)
|
|
|
|
|
_v and LOG.debug('Importer.load_module(%r)', fullname)
|
|
|
|
|
_v and LOG.debug('importer: requesting %s', fullname)
|
|
|
|
|
self._refuse_imports(fullname)
|
|
|
|
|
|
|
|
|
|
event = threading.Event()
|
|
|
|
@ -1507,18 +1507,17 @@ class Protocol(object):
|
|
|
|
|
return stream
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return '%s.%s(%s)' % (
|
|
|
|
|
self.__class__.__module__,
|
|
|
|
|
return '%s(%s)' % (
|
|
|
|
|
self.__class__.__name__,
|
|
|
|
|
self.stream and self.stream.name,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def on_shutdown(self, broker):
|
|
|
|
|
_v and LOG.debug('%r.on_shutdown()', self)
|
|
|
|
|
_v and LOG.debug('%r: shutting down', self)
|
|
|
|
|
self.stream.on_disconnect(broker)
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self, broker):
|
|
|
|
|
LOG.debug('%r.on_disconnect()', self)
|
|
|
|
|
LOG.debug('%r: disconnecting', self)
|
|
|
|
|
if self.stream.receive_side:
|
|
|
|
|
broker.stop_receive(self.stream)
|
|
|
|
|
self.stream.receive_side.close()
|
|
|
|
@ -1624,21 +1623,29 @@ class BufferedWriter(object):
|
|
|
|
|
|
|
|
|
|
class Side(object):
|
|
|
|
|
"""
|
|
|
|
|
Represent a single side of a :class:`BasicStream`. This exists to allow
|
|
|
|
|
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
|
|
|
|
|
(e.g. UNIX socket) file descriptors to operate identically.
|
|
|
|
|
Represent one side of a :class:`Stream`. This allows unidirectional (e.g.
|
|
|
|
|
pipe) and bidirectional (e.g. socket) streams to operate identically.
|
|
|
|
|
|
|
|
|
|
Sides are also responsible for tracking the open/closed state of the
|
|
|
|
|
underlying FD, preventing erroneous duplicate calls to :func:`os.close` due
|
|
|
|
|
to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk
|
|
|
|
|
silently succeeding by closing an unrelated descriptor. For this reason, it
|
|
|
|
|
is crucial only one :class:`Side` exists per unique descriptor.
|
|
|
|
|
|
|
|
|
|
:param mitogen.core.Stream stream:
|
|
|
|
|
The stream this side is associated with.
|
|
|
|
|
|
|
|
|
|
:param int fd:
|
|
|
|
|
Underlying file descriptor.
|
|
|
|
|
|
|
|
|
|
:param object fp:
|
|
|
|
|
The file or socket object managing the underlying file descriptor. Any
|
|
|
|
|
object may be used that supports `fileno()` and `close()` methods.
|
|
|
|
|
:param bool cloexec:
|
|
|
|
|
If :data:`True`, the descriptor has its :data:`fcntl.FD_CLOEXEC` flag
|
|
|
|
|
enabled using :func:`fcntl.fcntl`.
|
|
|
|
|
:param bool keep_alive:
|
|
|
|
|
Value for :attr:`keep_alive`
|
|
|
|
|
|
|
|
|
|
During construction, the file descriptor has its :data:`os.O_NONBLOCK` flag
|
|
|
|
|
enabled using :func:`fcntl.fcntl`.
|
|
|
|
|
If :data:`True`, the continued existence of this side will extend the
|
|
|
|
|
shutdown grace period until it has been unregistered from the broker.
|
|
|
|
|
:param bool blocking:
|
|
|
|
|
If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag
|
|
|
|
|
enabled using :func:`fcntl.fcntl`.
|
|
|
|
|
"""
|
|
|
|
|
_fork_refs = weakref.WeakValueDictionary()
|
|
|
|
|
|
|
|
|
@ -1674,8 +1681,8 @@ class Side(object):
|
|
|
|
|
Call :func:`os.close` on :attr:`fd` if it is not :data:`None`,
|
|
|
|
|
then set it to :data:`None`.
|
|
|
|
|
"""
|
|
|
|
|
_vv and IOLOG.debug('%r.close()', self)
|
|
|
|
|
if not self.closed:
|
|
|
|
|
_vv and IOLOG.debug('%r.close()', self)
|
|
|
|
|
self.closed = True
|
|
|
|
|
os.close(self.fd)
|
|
|
|
|
|
|
|
|
@ -1699,7 +1706,7 @@ class Side(object):
|
|
|
|
|
return b('')
|
|
|
|
|
s, disconnected = io_op(os.read, self.fd, n)
|
|
|
|
|
if disconnected:
|
|
|
|
|
LOG.debug('%r.read(): disconnected: %s', self, disconnected)
|
|
|
|
|
LOG.debug('%r: disconnected during read: %s', self, disconnected)
|
|
|
|
|
return b('')
|
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
@ -1720,7 +1727,7 @@ class Side(object):
|
|
|
|
|
|
|
|
|
|
written, disconnected = io_op(os.write, self.fd, s)
|
|
|
|
|
if disconnected:
|
|
|
|
|
LOG.debug('%r.write(): disconnected: %s', self, disconnected)
|
|
|
|
|
LOG.debug('%r: disconnected during write: %s', self, disconnected)
|
|
|
|
|
return None
|
|
|
|
|
return written
|
|
|
|
|
|
|
|
|
@ -2004,7 +2011,7 @@ class Context(object):
|
|
|
|
|
return _unpickle_context, (self.context_id, name)
|
|
|
|
|
|
|
|
|
|
def on_disconnect(self):
|
|
|
|
|
_v and LOG.debug('%r.on_disconnect()', self)
|
|
|
|
|
_v and LOG.debug('%r: disconnecting', self)
|
|
|
|
|
fire(self, 'disconnect')
|
|
|
|
|
|
|
|
|
|
def send_async(self, msg, persist=False):
|
|
|
|
@ -2416,7 +2423,7 @@ class Latch(object):
|
|
|
|
|
:meth:`put` to write a byte to our socket pair.
|
|
|
|
|
"""
|
|
|
|
|
_vv and IOLOG.debug(
|
|
|
|
|
'%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)',
|
|
|
|
|
'%r._get_sleep(timeout=%r, block=%r, fd=%d/%d)',
|
|
|
|
|
self, timeout, block, rsock.fileno(), wsock.fileno()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -2514,10 +2521,9 @@ class Waker(BasicStream):
|
|
|
|
|
self.transmit_side = Side(self, wfd)
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Waker(%r rfd=%r, wfd=%r)' % (
|
|
|
|
|
self._broker,
|
|
|
|
|
self.receive_side and self.receive_side.fd,
|
|
|
|
|
self.transmit_side and self.transmit_side.fd,
|
|
|
|
|
return 'Waker(fd=%r/%r)' % (
|
|
|
|
|
self.stream.receive_side and self.stream.receive_side.fd,
|
|
|
|
|
self.stream.transmit_side and self.stream.transmit_side.fd,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@ -2626,7 +2632,7 @@ class IoLogger(BasicStream):
|
|
|
|
|
|
|
|
|
|
def on_shutdown(self, broker):
|
|
|
|
|
"""Shut down the write end of the logging socket."""
|
|
|
|
|
_v and LOG.debug('%r.on_shutdown()', self)
|
|
|
|
|
_v and LOG.debug('%r: shutting down', self)
|
|
|
|
|
if not IS_WSL:
|
|
|
|
|
# #333: WSL generates invalid readiness indication on shutdown()
|
|
|
|
|
self._wsock.shutdown(socket.SHUT_WR)
|
|
|
|
@ -2713,12 +2719,13 @@ class Router(object):
|
|
|
|
|
corresponding :attr:`_context_by_id` member. This is replaced by
|
|
|
|
|
:class:`mitogen.parent.RouteMonitor` in an upgraded context.
|
|
|
|
|
"""
|
|
|
|
|
LOG.error('%r._on_del_route() %r', self, msg)
|
|
|
|
|
if msg.is_dead:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
target_id_s, _, name = bytes_partition(msg.data, b(':'))
|
|
|
|
|
target_id = int(target_id_s, 10)
|
|
|
|
|
LOG.error('%r: deleting route to %s (%d)',
|
|
|
|
|
self, to_text(name), target_id)
|
|
|
|
|
context = self._context_by_id.get(target_id)
|
|
|
|
|
if context:
|
|
|
|
|
fire(context, 'disconnect')
|
|
|
|
@ -2790,7 +2797,8 @@ class Router(object):
|
|
|
|
|
the stream's receive side to the I/O multiplexer. This method remains
|
|
|
|
|
public while the design has not yet settled.
|
|
|
|
|
"""
|
|
|
|
|
_v and LOG.debug('register(%r, %r)', context, stream)
|
|
|
|
|
_v and LOG.debug('%s: registering %r to stream %r',
|
|
|
|
|
self, context, stream)
|
|
|
|
|
self._write_lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
self._stream_by_id[context.context_id] = stream
|
|
|
|
@ -2916,7 +2924,7 @@ class Router(object):
|
|
|
|
|
def on_shutdown(self, broker):
|
|
|
|
|
"""Called during :meth:`Broker.shutdown`, informs callbacks registered
|
|
|
|
|
with :meth:`add_handle_cb` the connection is dead."""
|
|
|
|
|
_v and LOG.debug('%r.on_shutdown(%r)', self, broker)
|
|
|
|
|
_v and LOG.debug('%r: shutting down', self, broker)
|
|
|
|
|
fire(self, 'shutdown')
|
|
|
|
|
for handle, (persist, fn) in self._handle_map.iteritems():
|
|
|
|
|
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
|
|
|
|
@ -3199,7 +3207,7 @@ class Broker(object):
|
|
|
|
|
to shut down gracefully, then discard the :class:`Poller`.
|
|
|
|
|
"""
|
|
|
|
|
for _, (side, _) in self.poller.readers + self.poller.writers:
|
|
|
|
|
LOG.debug('_broker_main() force disconnecting %r', side)
|
|
|
|
|
LOG.debug('%r: force disconnecting %r', self, side)
|
|
|
|
|
side.stream.on_disconnect(self)
|
|
|
|
|
|
|
|
|
|
self.poller.close()
|
|
|
|
@ -3253,7 +3261,7 @@ class Broker(object):
|
|
|
|
|
Request broker gracefully disconnect streams and stop. Safe to call
|
|
|
|
|
from any thread.
|
|
|
|
|
"""
|
|
|
|
|
_v and LOG.debug('%r.shutdown()', self)
|
|
|
|
|
_v and LOG.debug('%r: shutting down', self)
|
|
|
|
|
def _shutdown():
|
|
|
|
|
self._alive = False
|
|
|
|
|
if self._alive and not self._exitted:
|
|
|
|
@ -3267,7 +3275,7 @@ class Broker(object):
|
|
|
|
|
self._thread.join()
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Broker(%#x)' % (id(self),)
|
|
|
|
|
return 'Broker(%04x)' % (id(self) & 0xffff,)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Dispatcher(object):
|
|
|
|
@ -3281,6 +3289,9 @@ class Dispatcher(object):
|
|
|
|
|
mode, any exception that occurs is recorded, and causes all subsequent
|
|
|
|
|
calls with the same `chain_id` to fail with the same exception.
|
|
|
|
|
"""
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Dispatcher'
|
|
|
|
|
|
|
|
|
|
def __init__(self, econtext):
|
|
|
|
|
self.econtext = econtext
|
|
|
|
|
#: Chain ID -> CallError if prior call failed.
|
|
|
|
@ -3297,7 +3308,7 @@ class Dispatcher(object):
|
|
|
|
|
|
|
|
|
|
def _parse_request(self, msg):
|
|
|
|
|
data = msg.unpickle(throw=False)
|
|
|
|
|
_v and LOG.debug('_dispatch_one(%r)', data)
|
|
|
|
|
_v and LOG.debug('%r: dispatching %r', self, data)
|
|
|
|
|
|
|
|
|
|
chain_id, modname, klass, func, args, kwargs = data
|
|
|
|
|
obj = import_module(modname)
|
|
|
|
@ -3331,7 +3342,7 @@ class Dispatcher(object):
|
|
|
|
|
def _dispatch_calls(self):
|
|
|
|
|
for msg in self.recv:
|
|
|
|
|
chain_id, ret = self._dispatch_one(msg)
|
|
|
|
|
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
|
|
|
|
|
_v and LOG.debug('%r: %r -> %r', self, msg, ret)
|
|
|
|
|
if msg.reply_to:
|
|
|
|
|
msg.reply(ret)
|
|
|
|
|
elif isinstance(ret, CallError) and chain_id is None:
|
|
|
|
@ -3411,8 +3422,8 @@ class ExternalContext(object):
|
|
|
|
|
th.start()
|
|
|
|
|
|
|
|
|
|
def _on_shutdown_msg(self, msg):
|
|
|
|
|
_v and LOG.debug('_on_shutdown_msg(%r)', msg)
|
|
|
|
|
if not msg.is_dead:
|
|
|
|
|
_v and LOG.debug('shutdown request from context %d', msg.src_id)
|
|
|
|
|
self.broker.shutdown()
|
|
|
|
|
|
|
|
|
|
def _on_parent_disconnect(self):
|
|
|
|
@ -3421,7 +3432,7 @@ class ExternalContext(object):
|
|
|
|
|
mitogen.parent_id = None
|
|
|
|
|
LOG.info('Detachment complete')
|
|
|
|
|
else:
|
|
|
|
|
_v and LOG.debug('%r: parent stream is gone, dying.', self)
|
|
|
|
|
_v and LOG.debug('parent stream is gone, dying.')
|
|
|
|
|
self.broker.shutdown()
|
|
|
|
|
|
|
|
|
|
def detach(self):
|
|
|
|
|