Split out and make readable more log messages across both packages

pull/612/head
David Wilson 5 years ago
parent 0f23a90d50
commit 5298e87548

@ -481,7 +481,8 @@ class ClassicWorkerModel(WorkerModel):
communicate with it. This is a simple hash of the inventory name.
"""
mux = self._muxes[abs(hash(name)) % len(self._muxes)]
LOG.debug('Picked worker %d: %s', mux.index, mux.path)
LOG.debug('will use multiplexer %d (%s) to connect to "%s"',
mux.index, mux.path, name)
return mux.path
def _reconnect(self, path):
@ -534,7 +535,7 @@ class ClassicWorkerModel(WorkerModel):
for mux in self._muxes:
_, status = os.waitpid(mux.pid, 0)
status = mitogen.fork._convert_exit_status(status)
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
LOG.debug('multiplexer %d PID %d %s', mux.index, mux.pid,
mitogen.parent.returncode_to_str(status))
def _test_reset(self):

@ -180,7 +180,7 @@ class ContextService(mitogen.service.Service):
Return a reference, making it eligable for recycling once its reference
count reaches zero.
"""
LOG.debug('%r.put(%r)', self, context)
LOG.debug('decrementing reference count for %r', context)
self._lock.acquire()
try:
if self._refs_by_context.get(context, 0) == 0:

@ -875,7 +875,8 @@ class Message(object):
if msg.handle:
(self.router or router).route(msg)
else:
LOG.debug('Message.reply(): discarding due to zero handle: %r', msg)
LOG.debug('dropping reply to message with no return address: %r',
msg)
if PY3:
UNPICKLER_KWARGS = {'encoding': 'bytes'}
@ -1224,6 +1225,7 @@ class Importer(object):
ALWAYS_BLACKLIST += ['cStringIO']
def __init__(self, router, context, core_src, whitelist=(), blacklist=()):
self._log = LOG.getChild('importer')
self._context = context
self._present = {'mitogen': self.MITOGEN_PKG_CONTENT}
self._lock = threading.Lock()
@ -1272,7 +1274,7 @@ class Importer(object):
)
def __repr__(self):
return 'Importer()'
return 'Importer'
def builtin_find_module(self, fullname):
# imp.find_module() will always succeed for __main__, because it is a
@ -1297,18 +1299,18 @@ class Importer(object):
_tls.running = True
try:
_v and LOG.debug('%r.find_module(%r)', self, fullname)
#_v and self._log.debug('Python requested %r', fullname)
fullname = to_text(fullname)
pkgname, dot, _ = str_rpartition(fullname, '.')
pkg = sys.modules.get(pkgname)
if pkgname and getattr(pkg, '__loader__', None) is not self:
LOG.debug('%r: %r is submodule of a package we did not load',
self, fullname)
self._log.debug('%s is submodule of a locally loaded package',
fullname)
return None
suffix = fullname[len(pkgname+dot):]
if pkgname and suffix not in self._present.get(pkgname, ()):
LOG.debug('%r: master doesn\'t know %r', self, fullname)
self._log.debug('%s has no submodule %s', pkgname, suffix)
return None
# #114: explicitly whitelisted prefixes override any
@ -1319,10 +1321,9 @@ class Importer(object):
try:
self.builtin_find_module(fullname)
_vv and IOLOG.debug('%r: %r is available locally',
self, fullname)
_vv and self._log.debug('%r is available locally', fullname)
except ImportError:
_vv and IOLOG.debug('find_module(%r) returning self', fullname)
_vv and self._log.debug('we will try to load %r', fullname)
return self
finally:
del _tls.running
@ -1373,7 +1374,7 @@ class Importer(object):
tup = msg.unpickle()
fullname = tup[0]
_v and LOG.debug('importer: received %s', fullname)
_v and self._log.debug('received %s', fullname)
self._lock.acquire()
try:
@ -1397,12 +1398,12 @@ class Importer(object):
if not present:
funcs = self._callbacks.get(fullname)
if funcs is not None:
_v and LOG.debug('%s: existing request for %s in flight',
self, fullname)
_v and self._log.debug('existing request for %s in flight',
fullname)
funcs.append(callback)
else:
_v and LOG.debug('%s: requesting %s from parent',
self, fullname)
_v and self._log.debug('sending new %s request to parent',
fullname)
self._callbacks[fullname] = [callback]
self._context.send(
Message(data=b(fullname), handle=GET_MODULE)
@ -1415,7 +1416,7 @@ class Importer(object):
def load_module(self, fullname):
fullname = to_text(fullname)
_v and LOG.debug('importer: requesting %s', fullname)
_v and self._log.debug('requesting %s', fullname)
self._refuse_imports(fullname)
event = threading.Event()
@ -1679,7 +1680,7 @@ class DelimitedProtocol(Protocol):
_trailer = b('')
def on_receive(self, broker, buf):
IOLOG.debug('%r.on_receive()', self)
_vv and IOLOG.debug('%r.on_receive()', self)
self._trailer, cont = mitogen.core.iter_split(
buf=self._trailer + buf,
delim=self.delimiter,
@ -1743,13 +1744,13 @@ class BufferedWriter(object):
buf = self._buf.popleft()
written = self._protocol.stream.transmit_side.write(buf)
if not written:
_v and LOG.debug('%r.on_transmit(): disconnection detected', self)
_v and LOG.debug('disconnected during write to %r', self)
self._protocol.stream.on_disconnect(broker)
return
elif written != len(buf):
self._buf.appendleft(BufferType(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
_vv and IOLOG.debug('transmitted %d bytes to %r', written, self)
self._len -= written
if not self._buf:
@ -2068,13 +2069,13 @@ class Context(object):
msg.dst_id = self.context_id
msg.reply_to = receiver.handle
_v and LOG.debug('%r.send_async(%r)', self, msg)
_v and LOG.debug('sending message to %r: %r', self, msg)
self.send(msg)
return receiver
def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('%r.call_service_async(%r, %r, %r)',
self, service_name, method_name, kwargs)
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType):

@ -796,6 +796,7 @@ class ModuleFinder(object):
class ModuleResponder(object):
def __init__(self, router):
self._log = logging.getLogger('mitogen.responder')
self._router = router
self._finder = ModuleFinder()
self._cache = {} # fullname -> pickled
@ -863,7 +864,7 @@ class ModuleResponder(object):
if b('mitogen.main(') in src:
return src
LOG.error(self.main_guard_msg, path)
self._log.error(self.main_guard_msg, path)
raise ImportError('refused')
def _make_negative_response(self, fullname):
@ -882,8 +883,7 @@ class ModuleResponder(object):
if path and is_stdlib_path(path):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.debug('%r: refusing to serve stdlib module %r',
self, fullname)
self._log.debug('refusing to serve stdlib module %r', fullname)
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
@ -891,7 +891,7 @@ class ModuleResponder(object):
if source is None:
# TODO: make this .warning() or similar again once importer has its
# own logging category.
LOG.debug('_build_tuple(%r): could not locate source', fullname)
self._log.debug('could not find source for %r', fullname)
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
@ -904,8 +904,8 @@ class ModuleResponder(object):
if is_pkg:
pkg_present = get_child_modules(path)
LOG.debug('_build_tuple(%r, %r) -> %r',
path, fullname, pkg_present)
self._log.debug('%s is a package at %s with submodules %r',
fullname, path, pkg_present)
else:
pkg_present = None
@ -936,8 +936,8 @@ class ModuleResponder(object):
dst_id=stream.protocol.remote_id,
handle=mitogen.core.LOAD_MODULE,
)
LOG.debug('%s: sending %s (%.2f KiB) to %s',
self, fullname, len(msg.data) / 1024.0, stream.name)
self._log.debug('sending %s (%.2f KiB) to %s',
fullname, len(msg.data) / 1024.0, stream.name)
self._router._async_route(msg)
stream.protocol.sent_modules.add(fullname)
if tup[2] is not None:
@ -983,7 +983,7 @@ class ModuleResponder(object):
return
fullname = msg.data.decode()
LOG.debug('%s requested module %s', stream.name, fullname)
self._log.debug('%s requested module %s', stream.name, fullname)
self.get_module_count += 1
if fullname in stream.protocol.sent_modules:
LOG.warning('_on_get_module(): dup request for %r from %r',

@ -41,6 +41,7 @@ import getpass
import heapq
import inspect
import logging
import logging
import os
import re
import signal
@ -65,9 +66,12 @@ except ImportError:
import mitogen.core
from mitogen.core import b
from mitogen.core import bytes_partition
from mitogen.core import LOG
from mitogen.core import IOLOG
LOG = logging.getLogger(__name__)
try:
next
except NameError:
@ -663,7 +667,7 @@ def _upgrade_broker(broker):
root.setLevel(old_level)
broker.timers = TimerList()
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
LOG.debug('upgraded %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),
len(old.readers), len(old.writers))
@ -1141,7 +1145,7 @@ class BootstrapProtocol(RegexProtocol):
self._writer.write(self.stream.conn.get_preamble())
def _on_ec1_received(self, line, match):
LOG.debug('%r: first stage received bootstrap', self)
LOG.debug('%r: first stage received mitogen.core source', self)
def _on_ec2_received(self, line, match):
LOG.debug('%r: new child booted successfully', self)
@ -1454,8 +1458,8 @@ class Connection(object):
"""
Fail the connection attempt.
"""
LOG.debug('%s: failing connection due to %r',
self.stdio_stream.name, exc)
LOG.debug('failing connection %s due to %r',
self.stdio_stream and self.stdio_stream.name, exc)
if self.exception is None:
self._adorn_eof_error(exc)
self.exception = exc
@ -1558,9 +1562,10 @@ class Connection(object):
return stream
def _async_connect(self):
LOG.debug('creating connection to context %d using %s',
self.context.context_id, self.__class__.__module__)
mitogen.core.listen(self._router.broker, 'shutdown',
self._on_broker_shutdown)
self._start_timer()
self.stdio_stream = self._setup_stdio_stream()
if self.context.name is None:
@ -1570,7 +1575,6 @@ class Connection(object):
self.stderr_stream = self._setup_stderr_stream()
def connect(self, context):
LOG.debug('%r.connect()', self)
self.context = context
self.proc = self.start_child()
LOG.debug('%r.connect(): pid:%r stdin:%r stdout:%r stderr:%r',
@ -1759,7 +1763,9 @@ class CallChain(object):
pipelining is disabled, the exception will be logged to the target
context's logging framework.
"""
LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
LOG.debug('starting no-reply function call to %r: %r',
self.context.name or self.context.context_id,
CallSpec(fn, args, kwargs))
self.context.send(self.make_msg(fn, *args, **kwargs))
def call_async(self, fn, *args, **kwargs):
@ -1815,7 +1821,9 @@ class CallChain(object):
contexts and consumed as they complete using
:class:`mitogen.select.Select`.
"""
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
LOG.debug('starting function call to %s: %r',
self.context.name or self.context.context_id,
CallSpec(fn, args, kwargs))
return self.context.send_async(self.make_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs):
@ -1946,6 +1954,7 @@ class RouteMonitor(object):
def __init__(self, router, parent=None):
self.router = router
self.parent = parent
self._log = logging.getLogger('mitogen.route_monitor')
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
@ -2066,8 +2075,8 @@ class RouteMonitor(object):
if routes is None:
return
LOG.debug('%r: %r is gone; propagating DEL_ROUTE for %r',
self, stream, routes)
self._log.debug('stream %s is gone; propagating DEL_ROUTE for %r',
stream.name, routes)
for target_id in routes:
self.router.del_route(target_id)
self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
@ -2093,12 +2102,12 @@ class RouteMonitor(object):
stream = self.router.stream_by_id(msg.auth_id)
current = self.router.stream_by_id(target_id)
if current and current.protocol.remote_id != mitogen.parent_id:
LOG.error('Cannot add duplicate route to %r via %r, '
self._log.error('Cannot add duplicate route to %r via %r, '
'already have existing route via %r',
target_id, stream, current)
return
LOG.debug('Adding route to %d via %r', target_id, stream)
self._log.debug('Adding route to %d via %r', target_id, stream)
self._routes_by_stream[stream].add(target_id)
self.router.add_route(target_id, stream)
self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name)
@ -2120,16 +2129,16 @@ class RouteMonitor(object):
stream = self.router.stream_by_id(msg.auth_id)
if registered_stream != stream:
LOG.error('%r: received DEL_ROUTE for %d from %r, expected %r',
self, target_id, stream, registered_stream)
self._log.error('received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream)
return
context = self.router.context_by_id(target_id, create=False)
if context:
LOG.debug('%r: firing local disconnect for %r', self, context)
self._log.debug('firing local disconnect signal for %r', context)
mitogen.core.fire(context, 'disconnect')
LOG.debug('%r: deleting route to %d via %r', self, target_id, stream)
self._log.debug('deleting route to %d via %r', target_id, stream)
routes = self._routes_by_stream.get(stream)
if routes:
routes.discard(target_id)
@ -2151,7 +2160,7 @@ class Router(mitogen.core.Router):
route_monitor = None
def upgrade(self, importer, parent):
LOG.debug('%r.upgrade()', self)
LOG.debug('upgrading %r with capabilities to start new children', self)
self.id_allocator = ChildIdAllocator(router=self)
self.responder = ModuleForwarder(
router=self,
@ -2211,7 +2220,8 @@ class Router(mitogen.core.Router):
but remains public while the design has not yet settled, and situations
may arise where routing is not fully automatic.
"""
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
LOG.debug('%r: adding route to context %r via %r',
self, target_id, stream)
assert isinstance(target_id, int)
assert isinstance(stream, mitogen.core.Stream)
@ -2480,7 +2490,7 @@ class ModuleForwarder(object):
return
fullname = msg.data.decode('utf-8')
LOG.debug('%r: %s requested by %d', self, fullname, msg.src_id)
LOG.debug('%r: %s requested by context %d', self, fullname, msg.src_id)
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)

@ -688,10 +688,12 @@ class PushFileService(Service):
def _forward(self, context, path):
stream = self.router.stream_by_id(context.context_id)
child = mitogen.core.Context(self.router, stream.protocol.remote_id)
child = self.router.context_by_id(stream.protocol.remote_id)
sent = self._sent_by_stream.setdefault(stream, set())
if path in sent:
if child.context_id != context.context_id:
LOG.debug('requesting %s forward small file to %s: %s',
child, context, path)
child.call_service_async(
service_name=self.name(),
method_name='forward',
@ -699,6 +701,8 @@ class PushFileService(Service):
context=context
).close()
else:
LOG.debug('requesting %s cache and forward small file to %s: %s',
child, context, path)
child.call_service_async(
service_name=self.name(),
method_name='store_and_forward',
@ -729,8 +733,8 @@ class PushFileService(Service):
'path': mitogen.core.FsPathTypes,
})
def propagate_to(self, context, path):
LOG.debug('%r.propagate_to(%r, %r)', self, context, path)
if path not in self._cache:
LOG.debug('caching small file %s', path)
fp = open(path, 'rb')
try:
self._cache[path] = mitogen.core.Blob(fp.read())

@ -36,6 +36,7 @@ have the same privilege (auth_id) as the current process.
"""
import errno
import logging
import os
import socket
import struct
@ -45,7 +46,8 @@ import tempfile
import mitogen.core
import mitogen.master
from mitogen.core import LOG
LOG = logging.getLogger(__name__)
class Error(mitogen.core.Error):
@ -143,8 +145,8 @@ class Listener(mitogen.core.Protocol):
try:
pid, = struct.unpack('>L', sock.recv(4))
except (struct.error, socket.error):
LOG.error('%r: failed to read remote identity: %s',
self, sys.exc_info()[1])
LOG.error('listener: failed to read remote identity: %s',
sys.exc_info()[1])
return
context_id = self._router.id_allocator.allocate()
@ -152,8 +154,8 @@ class Listener(mitogen.core.Protocol):
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('%r: failed to assign identity to PID %d: %s',
self, pid, sys.exc_info()[1])
LOG.error('listener: failed to assign identity to PID %d: %s',
pid, sys.exc_info()[1])
return
context = mitogen.parent.Context(self._router, context_id)
@ -165,7 +167,8 @@ class Listener(mitogen.core.Protocol):
stream.protocol.auth_id = mitogen.context_id
stream.protocol.is_privileged = True
stream.accept(sock, sock)
LOG.debug('%r: accepted %r', self, stream)
LOG.debug('listener: accepted connection from PID %d: %s',
pid, stream.name)
self._router.register(context, stream)
@ -186,7 +189,7 @@ def _connect(path, broker, sock):
mitogen.parent_id = remote_id
mitogen.parent_ids = [remote_id]
LOG.debug('unix.connect(): local ID is %r, remote is %r',
LOG.debug('client: local ID is %r, remote is %r',
mitogen.context_id, remote_id)
router = mitogen.master.Router(broker=broker)
@ -204,7 +207,7 @@ def _connect(path, broker, sock):
def connect(path, broker=None):
LOG.debug('unix.connect(path=%r)', path)
LOG.debug('client: connecting to %s', path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
return _connect(path, broker, sock)

Loading…
Cancel
Save