diff --git a/mitogen/core.py b/mitogen/core.py index b9462d6d..497e59a3 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -55,6 +55,9 @@ LOG = logging.getLogger('mitogen') IOLOG = logging.getLogger('mitogen.io') IOLOG.setLevel(logging.INFO) +_v = False +_vv = False + GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 @@ -174,7 +177,7 @@ def io_op(func, *args): try: return func(*args), False except OSError, e: - IOLOG.debug('io_op(%r) -> OSError: %s', func, e) + _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) if e.errno not in (errno.EIO, errno.ECONNRESET, errno.EPIPE): raise return None, True @@ -264,7 +267,7 @@ class Message(object): def unpickle(self, throw=True): """Deserialize `data` into an object.""" - IOLOG.debug('%r.unpickle()', self) + _vv and IOLOG.debug('%r.unpickle()', self) fp = cStringIO.StringIO(self.data) unpickler = cPickle.Unpickler(fp) unpickler.find_global = self._find_global @@ -300,7 +303,7 @@ class Sender(object): def close(self): """Indicate this channel is closed to the remote side.""" - IOLOG.debug('%r.close()', self) + _vv and IOLOG.debug('%r.close()', self) self.context.send( Message.pickled( _DEAD, @@ -310,7 +313,7 @@ class Sender(object): def put(self, data): """Send `data` to the remote.""" - IOLOG.debug('%r.put(%r..)', self, data[:100]) + _vv and IOLOG.debug('%r.put(%r..)', self, data[:100]) self.context.send( Message.pickled( data, @@ -335,7 +338,7 @@ class Receiver(object): def _on_receive(self, msg): """Callback from the Stream; appends data to the internal queue.""" - IOLOG.debug('%r._on_receive(%r)', self, msg) + _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) self._latch.put(msg) if self.notify: self.notify(self) @@ -347,7 +350,7 @@ class Receiver(object): return self._latch.empty() def get(self, timeout=None, block=True): - IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) + _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) #IOLOG.debug('%r.get() got %r', self, msg) @@ -419,22 +422,22 @@ class Importer(object): fullname = fullname.rstrip('.') try: pkgname, _, _ = fullname.rpartition('.') - LOG.debug('%r.find_module(%r)', self, fullname) + _v and LOG.debug('%r.find_module(%r)', self, fullname) if fullname not in self._present.get(pkgname, (fullname,)): - LOG.debug('%r: master doesn\'t know %r', self, fullname) + _v and LOG.debug('%r: master doesn\'t know %r', self, fullname) return None pkg = sys.modules.get(pkgname) if pkg and getattr(pkg, '__loader__', None) is not self: - LOG.debug('%r: %r is submodule of a package we did not load', + _v and LOG.debug('%r: %r is submodule of a package we did not load', self, fullname) return None try: __import__(fullname, {}, {}, ['']) - LOG.debug('%r: %r is available locally', self, fullname) + _v and LOG.debug('%r: %r is available locally', self, fullname) except ImportError: - LOG.debug('find_module(%r) returning self', fullname) + _v and LOG.debug('find_module(%r) returning self', fullname) return self finally: del _tls.running @@ -470,7 +473,7 @@ class Importer(object): def _on_load_module(self, msg): tup = msg.unpickle() fullname = tup[0] - LOG.debug('Importer._on_load_module(%r)', fullname) + _v and LOG.debug('Importer._on_load_module(%r)', fullname) self._lock.acquire() try: @@ -489,10 +492,10 @@ class Importer(object): if not present: funcs = self._callbacks.get(fullname) if funcs is not None: - LOG.debug('_request_module(%r): in flight', fullname) + _v and LOG.debug('_request_module(%r): in flight', fullname) funcs.append(callback) else: - LOG.debug('_request_module(%r): new request', fullname) + _v and LOG.debug('_request_module(%r): new request', fullname) self._callbacks[fullname] = [callback] self._context.send(Message(data=fullname, handle=GET_MODULE)) finally: @@ -502,7 +505,7 @@ class Importer(object): callback() def load_module(self, fullname): - LOG.debug('Importer.load_module(%r)', fullname) + _v and LOG.debug('Importer.load_module(%r)', fullname) self._load_module_hacks(fullname) event = threading.Event() @@ -573,7 +576,7 @@ class Side(object): def close(self): if self.fd is not None: - IOLOG.debug('%r.close()', self) + _vv and IOLOG.debug('%r.close()', self) os.close(self.fd) self.fd = None @@ -608,7 +611,7 @@ class BasicStream(object): fire(self, 'disconnect') def on_shutdown(self, broker): - LOG.debug('%r.on_shutdown()', self) + _v and LOG.debug('%r.on_shutdown()', self) fire(self, 'shutdown') self.on_disconnect(broker) @@ -638,7 +641,7 @@ class Stream(BasicStream): def on_receive(self, broker): """Handle the next complete message on the stream. Raise :py:class:`StreamError` on failure.""" - IOLOG.debug('%r.on_receive()', self) + _vv and IOLOG.debug('%r.on_receive()', self) buf = self.receive_side.read() if buf is None: @@ -669,7 +672,7 @@ class Stream(BasicStream): ) if (len(self._input_buf) - self.HEADER_LEN) < msg_len: - IOLOG.debug('%r: Input too short (want %d, got %d)', + _vv and IOLOG.debug('%r: Input too short (want %d, got %d)', self, msg_len, len(self._input_buf) - self.HEADER_LEN) return False @@ -680,25 +683,25 @@ class Stream(BasicStream): def on_transmit(self, broker): """Transmit buffered messages.""" - IOLOG.debug('%r.on_transmit()', self) + _vv and IOLOG.debug('%r.on_transmit()', self) if self._output_buf: buf = self._output_buf.popleft() written = self.transmit_side.write(buf) if not written: - LOG.debug('%r.on_transmit(): disconnection detected', self) + _v and LOG.debug('%r.on_transmit(): disconnection detected', self) self.on_disconnect(broker) return elif written != len(buf): self._output_buf.appendleft(buf[written:]) - IOLOG.debug('%r.on_transmit() -> len %d', self, written) + _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) if not self._output_buf: broker.stop_transmit(self) def _send(self, msg): - IOLOG.debug('%r._send(%r)', self, msg) + _vv and IOLOG.debug('%r._send(%r)', self, msg) pkt = struct.pack('>hhhLLL', msg.dst_id, msg.src_id, msg.auth_id, msg.handle, msg.reply_to or 0, len(msg.data) ) + msg.data @@ -716,7 +719,7 @@ class Stream(BasicStream): def on_shutdown(self, broker): """Override BasicStream behaviour of immediately disconnecting.""" - LOG.debug('%r.on_shutdown(%r)', self, broker) + _v and LOG.debug('%r.on_shutdown(%r)', self, broker) def accept(self, rfd, wfd): # TODO: what is this os.dup for? @@ -742,7 +745,7 @@ class Context(object): return _unpickle_context, (self.context_id, self.name) def on_disconnect(self, broker): - LOG.debug('Parent stream is gone, dying.') + _v and LOG.debug('Parent stream is gone, dying.') fire(self, 'disconnect') broker.shutdown() @@ -762,7 +765,7 @@ class Context(object): receiver = Receiver(self.router, persist=persist, respondent=self) msg.reply_to = receiver.handle - LOG.debug('%r.send_async(%r)', self, msg) + _v and LOG.debug('%r.send_async(%r)', self, msg) self.send(msg) return receiver @@ -771,7 +774,7 @@ class Context(object): receiver = self.send_async(msg) response = receiver.get(deadline) data = response.unpickle() - IOLOG.debug('%r._send_await() -> %r', self, data) + _vv and IOLOG.debug('%r._send_await() -> %r', self, data) return data def __repr__(self): @@ -829,7 +832,7 @@ class Latch(object): self.lock.release() def put(self, obj): - IOLOG.debug('%r.put(%r)', self, obj) + _vv and IOLOG.debug('%r.put(%r)', self, obj) self.lock.acquire() try: self.queue.append(obj) @@ -838,7 +841,7 @@ class Latch(object): self._wake(self.wake_socks.pop(0)) finally: self.lock.release() - LOG.debug('put() done. woken? %s', woken) + _v and LOG.debug('put() done. woken? %s', woken) def _wake(self, sock): try: @@ -878,7 +881,7 @@ class Waker(BasicStream): Write a byte to the self-pipe, causing the IO multiplexer to wake up. Nothing is written if the current thread is the IO multiplexer thread. """ - IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) + _vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) if threading.currentThread() != self._broker._thread: try: self.transmit_side.write(' ') @@ -918,13 +921,13 @@ class IoLogger(BasicStream): def on_shutdown(self, broker): """Shut down the write end of the logging socket.""" - LOG.debug('%r.on_shutdown()', self) + _v and LOG.debug('%r.on_shutdown()', self) self._wsock.shutdown(socket.SHUT_WR) self._wsock.close() self.transmit_side.close() def on_receive(self, broker): - IOLOG.debug('%r.on_receive()', self) + _vv and IOLOG.debug('%r.on_receive()', self) buf = os.read(self.receive_side.fd, CHUNK_SIZE) if not buf: return self.on_disconnect(broker) @@ -940,6 +943,11 @@ class Router(object): self.broker = broker listen(broker, 'shutdown', self.on_broker_shutdown) + # Here seems as good a place as any. + global _v, _vv + _v = logging.getLogger().level <= logging.DEBUG + _vv = IOLOG.level <= logging.DEBUG + #: context ID -> Stream self._stream_by_id = {} #: List of contexts to notify of shutdown. @@ -970,7 +978,7 @@ class Router(object): context.on_shutdown(self.broker) def add_route(self, target_id, via_id): - LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) + _v and LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) try: self._stream_by_id[target_id] = self._stream_by_id[via_id] except KeyError: @@ -983,14 +991,14 @@ class Router(object): self.add_route(target_id, via_id) def register(self, context, stream): - LOG.debug('register(%r, %r)', context, stream) + _v and LOG.debug('register(%r, %r)', context, stream) self._stream_by_id[context.context_id] = stream self._context_by_id[context.context_id] = context self.broker.start_receive(stream) def add_handler(self, fn, handle=None, persist=True, respondent=None): handle = handle or self._last_handle.next() - IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) + _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) self._handle_map[handle] = persist, fn if respondent: @@ -1005,10 +1013,10 @@ class Router(object): def on_shutdown(self, broker): """Called during :py:meth:`Broker.shutdown`, informs callbacks registered with :py:meth:`add_handle_cb` the connection is dead.""" - LOG.debug('%r.on_shutdown(%r)', self, broker) + _v and LOG.debug('%r.on_shutdown(%r)', self, broker) fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): - LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) + _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) fn(_DEAD) def _invoke(self, msg): @@ -1028,7 +1036,7 @@ class Router(object): LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) def _async_route(self, msg, stream=None): - IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream) + _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream) # Perform source verification. if stream is not None: expected_stream = self._stream_by_id.get(msg.auth_id, @@ -1094,7 +1102,7 @@ class Broker(object): lst.append(value) def start_receive(self, stream): - IOLOG.debug('%r.start_receive(%r)', self, stream) + _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) assert stream.receive_side and stream.receive_side.fd is not None self.defer(self._list_add, self._readers, stream.receive_side) @@ -1129,7 +1137,7 @@ class Broker(object): self.shutdown() def _loop_once(self, timeout=None): - IOLOG.debug('%r._loop_once(%r)', self, timeout) + _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout) self._run_defer() #IOLOG.debug('readers = %r', self._readers) @@ -1137,11 +1145,11 @@ class Broker(object): rsides, wsides, _ = select.select(self._readers, self._writers, (), timeout) for side in rsides: - IOLOG.debug('%r: POLLIN for %r', self, side) + _vv and IOLOG.debug('%r: POLLIN for %r', self, side) self._call(side.stream, side.stream.on_receive) for side in wsides: - IOLOG.debug('%r: POLLOUT for %r', self, side) + _vv and IOLOG.debug('%r: POLLOUT for %r', self, side) self._call(side.stream, side.stream.on_transmit) def keep_alive(self): @@ -1178,7 +1186,7 @@ class Broker(object): fire(self, 'exit') def shutdown(self): - LOG.debug('%r.shutdown()', self) + _v and LOG.debug('%r.shutdown()', self) self._alive = False self._waker.wake() @@ -1198,7 +1206,7 @@ class ExternalContext(object): os.kill(os.getpid(), signal.SIGTERM) def _on_shutdown_msg(self, msg): - LOG.debug('_on_shutdown_msg(%r)', msg) + _v and LOG.debug('_on_shutdown_msg(%r)', msg) if msg.src_id != mitogen.parent_id: LOG.warning('Ignoring SHUTDOWN from non-parent: %r', msg) return @@ -1286,7 +1294,7 @@ class ExternalContext(object): def _dispatch_calls(self): for msg in self.channel: data = msg.unpickle(throw=False) - LOG.debug('_dispatch_calls(%r)', data) + _v and LOG.debug('_dispatch_calls(%r)', data) if msg.auth_id not in mitogen.parent_ids: LOG.warning('CALL_FUNCTION from non-parent %r', msg.auth_id) @@ -1302,7 +1310,7 @@ class ExternalContext(object): kwargs.setdefault('router', self.router) msg.reply(fn(*args, **kwargs)) except Exception, e: - LOG.debug('_dispatch_calls: %s', e) + _v and LOG.debug('_dispatch_calls: %s', e) msg.reply(CallError(e)) self.dispatch_stopped = True @@ -1320,12 +1328,12 @@ class ExternalContext(object): self.router.register(self.parent, self.stream) sys.executable = os.environ.pop('ARGV0', sys.executable) - LOG.debug('Connected to %s; my ID is %r, PID is %r', + _v and LOG.debug('Connected to %s; my ID is %r, PID is %r', self.parent, context_id, os.getpid()) - LOG.debug('Recovered sys.executable: %r', sys.executable) + _v and LOG.debug('Recovered sys.executable: %r', sys.executable) _profile_hook('main', self._dispatch_calls) - LOG.debug('ExternalContext.main() normal exit') + _v and LOG.debug('ExternalContext.main() normal exit') except BaseException: LOG.exception('ExternalContext.main() crashed') raise