diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 7e74c36a..93b72f3f 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -481,7 +481,8 @@ class ClassicWorkerModel(WorkerModel): communicate with it. This is a simple hash of the inventory name. """ mux = self._muxes[abs(hash(name)) % len(self._muxes)] - LOG.debug('Picked worker %d: %s', mux.index, mux.path) + LOG.debug('will use multiplexer %d (%s) to connect to "%s"', + mux.index, mux.path, name) return mux.path def _reconnect(self, path): @@ -534,7 +535,7 @@ class ClassicWorkerModel(WorkerModel): for mux in self._muxes: _, status = os.waitpid(mux.pid, 0) status = mitogen.fork._convert_exit_status(status) - LOG.debug('mux %d PID %d %s', mux.index, mux.pid, + LOG.debug('multiplexer %d PID %d %s', mux.index, mux.pid, mitogen.parent.returncode_to_str(status)) def _test_reset(self): diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index a8fde265..fa55f2ec 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -180,7 +180,7 @@ class ContextService(mitogen.service.Service): Return a reference, making it eligable for recycling once its reference count reaches zero. """ - LOG.debug('%r.put(%r)', self, context) + LOG.debug('decrementing reference count for %r', context) self._lock.acquire() try: if self._refs_by_context.get(context, 0) == 0: diff --git a/mitogen/core.py b/mitogen/core.py index 4dc3e7ef..aca7972f 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -875,7 +875,8 @@ class Message(object): if msg.handle: (self.router or router).route(msg) else: - LOG.debug('Message.reply(): discarding due to zero handle: %r', msg) + LOG.debug('dropping reply to message with no return address: %r', + msg) if PY3: UNPICKLER_KWARGS = {'encoding': 'bytes'} @@ -1224,6 +1225,7 @@ class Importer(object): ALWAYS_BLACKLIST += ['cStringIO'] def __init__(self, router, context, core_src, whitelist=(), blacklist=()): + self._log = LOG.getChild('importer') self._context = context self._present = {'mitogen': self.MITOGEN_PKG_CONTENT} self._lock = threading.Lock() @@ -1272,7 +1274,7 @@ class Importer(object): ) def __repr__(self): - return 'Importer()' + return 'Importer' def builtin_find_module(self, fullname): # imp.find_module() will always succeed for __main__, because it is a @@ -1297,18 +1299,18 @@ class Importer(object): _tls.running = True try: - _v and LOG.debug('%r.find_module(%r)', self, fullname) + #_v and self._log.debug('Python requested %r', fullname) fullname = to_text(fullname) pkgname, dot, _ = str_rpartition(fullname, '.') pkg = sys.modules.get(pkgname) if pkgname and getattr(pkg, '__loader__', None) is not self: - LOG.debug('%r: %r is submodule of a package we did not load', - self, fullname) + self._log.debug('%s is submodule of a locally loaded package', + fullname) return None suffix = fullname[len(pkgname+dot):] if pkgname and suffix not in self._present.get(pkgname, ()): - LOG.debug('%r: master doesn\'t know %r', self, fullname) + self._log.debug('%s has no submodule %s', pkgname, suffix) return None # #114: explicitly whitelisted prefixes override any @@ -1319,10 +1321,9 @@ class Importer(object): try: self.builtin_find_module(fullname) - _vv and IOLOG.debug('%r: %r is available locally', - self, fullname) + _vv and self._log.debug('%r is available locally', fullname) except ImportError: - _vv and IOLOG.debug('find_module(%r) returning self', fullname) + _vv and self._log.debug('we will try to load %r', fullname) return self finally: del _tls.running @@ -1373,7 +1374,7 @@ class Importer(object): tup = msg.unpickle() fullname = tup[0] - _v and LOG.debug('importer: received %s', fullname) + _v and self._log.debug('received %s', fullname) self._lock.acquire() try: @@ -1397,12 +1398,12 @@ class Importer(object): if not present: funcs = self._callbacks.get(fullname) if funcs is not None: - _v and LOG.debug('%s: existing request for %s in flight', - self, fullname) + _v and self._log.debug('existing request for %s in flight', + fullname) funcs.append(callback) else: - _v and LOG.debug('%s: requesting %s from parent', - self, fullname) + _v and self._log.debug('sending new %s request to parent', + fullname) self._callbacks[fullname] = [callback] self._context.send( Message(data=b(fullname), handle=GET_MODULE) @@ -1415,7 +1416,7 @@ class Importer(object): def load_module(self, fullname): fullname = to_text(fullname) - _v and LOG.debug('importer: requesting %s', fullname) + _v and self._log.debug('requesting %s', fullname) self._refuse_imports(fullname) event = threading.Event() @@ -1679,7 +1680,7 @@ class DelimitedProtocol(Protocol): _trailer = b('') def on_receive(self, broker, buf): - IOLOG.debug('%r.on_receive()', self) + _vv and IOLOG.debug('%r.on_receive()', self) self._trailer, cont = mitogen.core.iter_split( buf=self._trailer + buf, delim=self.delimiter, @@ -1743,13 +1744,13 @@ class BufferedWriter(object): buf = self._buf.popleft() written = self._protocol.stream.transmit_side.write(buf) if not written: - _v and LOG.debug('%r.on_transmit(): disconnection detected', self) + _v and LOG.debug('disconnected during write to %r', self) self._protocol.stream.on_disconnect(broker) return elif written != len(buf): self._buf.appendleft(BufferType(buf, written)) - _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) + _vv and IOLOG.debug('transmitted %d bytes to %r', written, self) self._len -= written if not self._buf: @@ -2068,13 +2069,13 @@ class Context(object): msg.dst_id = self.context_id msg.reply_to = receiver.handle - _v and LOG.debug('%r.send_async(%r)', self, msg) + _v and LOG.debug('sending message to %r: %r', self, msg) self.send(msg) return receiver def call_service_async(self, service_name, method_name, **kwargs): - _v and LOG.debug('%r.call_service_async(%r, %r, %r)', - self, service_name, method_name, kwargs) + _v and LOG.debug('calling service %s.%s of %r, args: %r', + service_name, method_name, self, kwargs) if isinstance(service_name, BytesType): service_name = service_name.encode('utf-8') elif not isinstance(service_name, UnicodeType): diff --git a/mitogen/master.py b/mitogen/master.py index 909c3cef..cb4452a1 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -796,6 +796,7 @@ class ModuleFinder(object): class ModuleResponder(object): def __init__(self, router): + self._log = logging.getLogger('mitogen.responder') self._router = router self._finder = ModuleFinder() self._cache = {} # fullname -> pickled @@ -863,7 +864,7 @@ class ModuleResponder(object): if b('mitogen.main(') in src: return src - LOG.error(self.main_guard_msg, path) + self._log.error(self.main_guard_msg, path) raise ImportError('refused') def _make_negative_response(self, fullname): @@ -882,8 +883,7 @@ class ModuleResponder(object): if path and is_stdlib_path(path): # Prevent loading of 2.x<->3.x stdlib modules! This costs one # RTT per hit, so a client-side solution is also required. - LOG.debug('%r: refusing to serve stdlib module %r', - self, fullname) + self._log.debug('refusing to serve stdlib module %r', fullname) tup = self._make_negative_response(fullname) self._cache[fullname] = tup return tup @@ -891,7 +891,7 @@ class ModuleResponder(object): if source is None: # TODO: make this .warning() or similar again once importer has its # own logging category. - LOG.debug('_build_tuple(%r): could not locate source', fullname) + self._log.debug('could not find source for %r', fullname) tup = self._make_negative_response(fullname) self._cache[fullname] = tup return tup @@ -904,8 +904,8 @@ class ModuleResponder(object): if is_pkg: pkg_present = get_child_modules(path) - LOG.debug('_build_tuple(%r, %r) -> %r', - path, fullname, pkg_present) + self._log.debug('%s is a package at %s with submodules %r', + fullname, path, pkg_present) else: pkg_present = None @@ -936,8 +936,8 @@ class ModuleResponder(object): dst_id=stream.protocol.remote_id, handle=mitogen.core.LOAD_MODULE, ) - LOG.debug('%s: sending %s (%.2f KiB) to %s', - self, fullname, len(msg.data) / 1024.0, stream.name) + self._log.debug('sending %s (%.2f KiB) to %s', + fullname, len(msg.data) / 1024.0, stream.name) self._router._async_route(msg) stream.protocol.sent_modules.add(fullname) if tup[2] is not None: @@ -983,7 +983,7 @@ class ModuleResponder(object): return fullname = msg.data.decode() - LOG.debug('%s requested module %s', stream.name, fullname) + self._log.debug('%s requested module %s', stream.name, fullname) self.get_module_count += 1 if fullname in stream.protocol.sent_modules: LOG.warning('_on_get_module(): dup request for %r from %r', diff --git a/mitogen/parent.py b/mitogen/parent.py index f8c5d95f..851882da 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -41,6 +41,7 @@ import getpass import heapq import inspect import logging +import logging import os import re import signal @@ -65,9 +66,12 @@ except ImportError: import mitogen.core from mitogen.core import b from mitogen.core import bytes_partition -from mitogen.core import LOG from mitogen.core import IOLOG + +LOG = logging.getLogger(__name__) + + try: next except NameError: @@ -663,7 +667,7 @@ def _upgrade_broker(broker): root.setLevel(old_level) broker.timers = TimerList() - LOG.debug('replaced %r with %r (new: %d readers, %d writers; ' + LOG.debug('upgraded %r with %r (new: %d readers, %d writers; ' 'old: %d readers, %d writers)', old, new, len(new.readers), len(new.writers), len(old.readers), len(old.writers)) @@ -1141,7 +1145,7 @@ class BootstrapProtocol(RegexProtocol): self._writer.write(self.stream.conn.get_preamble()) def _on_ec1_received(self, line, match): - LOG.debug('%r: first stage received bootstrap', self) + LOG.debug('%r: first stage received mitogen.core source', self) def _on_ec2_received(self, line, match): LOG.debug('%r: new child booted successfully', self) @@ -1454,8 +1458,8 @@ class Connection(object): """ Fail the connection attempt. """ - LOG.debug('%s: failing connection due to %r', - self.stdio_stream.name, exc) + LOG.debug('failing connection %s due to %r', + self.stdio_stream and self.stdio_stream.name, exc) if self.exception is None: self._adorn_eof_error(exc) self.exception = exc @@ -1558,9 +1562,10 @@ class Connection(object): return stream def _async_connect(self): + LOG.debug('creating connection to context %d using %s', + self.context.context_id, self.__class__.__module__) mitogen.core.listen(self._router.broker, 'shutdown', self._on_broker_shutdown) - self._start_timer() self.stdio_stream = self._setup_stdio_stream() if self.context.name is None: @@ -1570,7 +1575,6 @@ class Connection(object): self.stderr_stream = self._setup_stderr_stream() def connect(self, context): - LOG.debug('%r.connect()', self) self.context = context self.proc = self.start_child() LOG.debug('%r.connect(): pid:%r stdin:%r stdout:%r stderr:%r', @@ -1759,7 +1763,9 @@ class CallChain(object): pipelining is disabled, the exception will be logged to the target context's logging framework. """ - LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs)) + LOG.debug('starting no-reply function call to %r: %r', + self.context.name or self.context.context_id, + CallSpec(fn, args, kwargs)) self.context.send(self.make_msg(fn, *args, **kwargs)) def call_async(self, fn, *args, **kwargs): @@ -1815,7 +1821,9 @@ class CallChain(object): contexts and consumed as they complete using :class:`mitogen.select.Select`. """ - LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs)) + LOG.debug('starting function call to %s: %r', + self.context.name or self.context.context_id, + CallSpec(fn, args, kwargs)) return self.context.send_async(self.make_msg(fn, *args, **kwargs)) def call(self, fn, *args, **kwargs): @@ -1946,6 +1954,7 @@ class RouteMonitor(object): def __init__(self, router, parent=None): self.router = router self.parent = parent + self._log = logging.getLogger('mitogen.route_monitor') #: Mapping of Stream instance to integer context IDs reachable via the #: stream; used to cleanup routes during disconnection. self._routes_by_stream = {} @@ -2066,8 +2075,8 @@ class RouteMonitor(object): if routes is None: return - LOG.debug('%r: %r is gone; propagating DEL_ROUTE for %r', - self, stream, routes) + self._log.debug('stream %s is gone; propagating DEL_ROUTE for %r', + stream.name, routes) for target_id in routes: self.router.del_route(target_id) self._propagate_up(mitogen.core.DEL_ROUTE, target_id) @@ -2093,12 +2102,12 @@ class RouteMonitor(object): stream = self.router.stream_by_id(msg.auth_id) current = self.router.stream_by_id(target_id) if current and current.protocol.remote_id != mitogen.parent_id: - LOG.error('Cannot add duplicate route to %r via %r, ' - 'already have existing route via %r', - target_id, stream, current) + self._log.error('Cannot add duplicate route to %r via %r, ' + 'already have existing route via %r', + target_id, stream, current) return - LOG.debug('Adding route to %d via %r', target_id, stream) + self._log.debug('Adding route to %d via %r', target_id, stream) self._routes_by_stream[stream].add(target_id) self.router.add_route(target_id, stream) self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name) @@ -2120,16 +2129,16 @@ class RouteMonitor(object): stream = self.router.stream_by_id(msg.auth_id) if registered_stream != stream: - LOG.error('%r: received DEL_ROUTE for %d from %r, expected %r', - self, target_id, stream, registered_stream) + self._log.error('received DEL_ROUTE for %d from %r, expected %r', + target_id, stream, registered_stream) return context = self.router.context_by_id(target_id, create=False) if context: - LOG.debug('%r: firing local disconnect for %r', self, context) + self._log.debug('firing local disconnect signal for %r', context) mitogen.core.fire(context, 'disconnect') - LOG.debug('%r: deleting route to %d via %r', self, target_id, stream) + self._log.debug('deleting route to %d via %r', target_id, stream) routes = self._routes_by_stream.get(stream) if routes: routes.discard(target_id) @@ -2151,7 +2160,7 @@ class Router(mitogen.core.Router): route_monitor = None def upgrade(self, importer, parent): - LOG.debug('%r.upgrade()', self) + LOG.debug('upgrading %r with capabilities to start new children', self) self.id_allocator = ChildIdAllocator(router=self) self.responder = ModuleForwarder( router=self, @@ -2211,7 +2220,8 @@ class Router(mitogen.core.Router): but remains public while the design has not yet settled, and situations may arise where routing is not fully automatic. """ - LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) + LOG.debug('%r: adding route to context %r via %r', + self, target_id, stream) assert isinstance(target_id, int) assert isinstance(stream, mitogen.core.Stream) @@ -2480,7 +2490,7 @@ class ModuleForwarder(object): return fullname = msg.data.decode('utf-8') - LOG.debug('%r: %s requested by %d', self, fullname, msg.src_id) + LOG.debug('%r: %s requested by context %d', self, fullname, msg.src_id) callback = lambda: self._on_cache_callback(msg, fullname) self.importer._request_module(fullname, callback) diff --git a/mitogen/service.py b/mitogen/service.py index b9332780..da48521f 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -688,10 +688,12 @@ class PushFileService(Service): def _forward(self, context, path): stream = self.router.stream_by_id(context.context_id) - child = mitogen.core.Context(self.router, stream.protocol.remote_id) + child = self.router.context_by_id(stream.protocol.remote_id) sent = self._sent_by_stream.setdefault(stream, set()) if path in sent: if child.context_id != context.context_id: + LOG.debug('requesting %s forward small file to %s: %s', + child, context, path) child.call_service_async( service_name=self.name(), method_name='forward', @@ -699,6 +701,8 @@ class PushFileService(Service): context=context ).close() else: + LOG.debug('requesting %s cache and forward small file to %s: %s', + child, context, path) child.call_service_async( service_name=self.name(), method_name='store_and_forward', @@ -729,8 +733,8 @@ class PushFileService(Service): 'path': mitogen.core.FsPathTypes, }) def propagate_to(self, context, path): - LOG.debug('%r.propagate_to(%r, %r)', self, context, path) if path not in self._cache: + LOG.debug('caching small file %s', path) fp = open(path, 'rb') try: self._cache[path] = mitogen.core.Blob(fp.read()) diff --git a/mitogen/unix.py b/mitogen/unix.py index c34dc064..645b061d 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -36,6 +36,7 @@ have the same privilege (auth_id) as the current process. """ import errno +import logging import os import socket import struct @@ -45,7 +46,8 @@ import tempfile import mitogen.core import mitogen.master -from mitogen.core import LOG + +LOG = logging.getLogger(__name__) class Error(mitogen.core.Error): @@ -143,8 +145,8 @@ class Listener(mitogen.core.Protocol): try: pid, = struct.unpack('>L', sock.recv(4)) except (struct.error, socket.error): - LOG.error('%r: failed to read remote identity: %s', - self, sys.exc_info()[1]) + LOG.error('listener: failed to read remote identity: %s', + sys.exc_info()[1]) return context_id = self._router.id_allocator.allocate() @@ -152,8 +154,8 @@ class Listener(mitogen.core.Protocol): sock.send(struct.pack('>LLL', context_id, mitogen.context_id, os.getpid())) except socket.error: - LOG.error('%r: failed to assign identity to PID %d: %s', - self, pid, sys.exc_info()[1]) + LOG.error('listener: failed to assign identity to PID %d: %s', + pid, sys.exc_info()[1]) return context = mitogen.parent.Context(self._router, context_id) @@ -165,7 +167,8 @@ class Listener(mitogen.core.Protocol): stream.protocol.auth_id = mitogen.context_id stream.protocol.is_privileged = True stream.accept(sock, sock) - LOG.debug('%r: accepted %r', self, stream) + LOG.debug('listener: accepted connection from PID %d: %s', + pid, stream.name) self._router.register(context, stream) @@ -186,7 +189,7 @@ def _connect(path, broker, sock): mitogen.parent_id = remote_id mitogen.parent_ids = [remote_id] - LOG.debug('unix.connect(): local ID is %r, remote is %r', + LOG.debug('client: local ID is %r, remote is %r', mitogen.context_id, remote_id) router = mitogen.master.Router(broker=broker) @@ -204,7 +207,7 @@ def _connect(path, broker, sock): def connect(path, broker=None): - LOG.debug('unix.connect(path=%r)', path) + LOG.debug('client: connecting to %s', path) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: return _connect(path, broker, sock)