core: #39: don't call logging framework when logging is disabled

It looks ugly as sin, but this nets about a 20% drop in user CPU time,
and close to 15% increase in throughput.

The average log call is around 10 opcodes, prefixing with '_v and' costs
an extra 2, but both are simple operations, and the remaining 10 are
skipped entirely when _v or _vv are False.
pull/87/head
David Wilson 7 years ago
parent d0fbcc0f48
commit 5529a4fba6

@ -55,6 +55,9 @@ LOG = logging.getLogger('mitogen')
IOLOG = logging.getLogger('mitogen.io') IOLOG = logging.getLogger('mitogen.io')
IOLOG.setLevel(logging.INFO) IOLOG.setLevel(logging.INFO)
_v = False
_vv = False
GET_MODULE = 100 GET_MODULE = 100
CALL_FUNCTION = 101 CALL_FUNCTION = 101
FORWARD_LOG = 102 FORWARD_LOG = 102
@ -174,7 +177,7 @@ def io_op(func, *args):
try: try:
return func(*args), False return func(*args), False
except OSError, e: except OSError, e:
IOLOG.debug('io_op(%r) -> OSError: %s', func, e) _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e.errno not in (errno.EIO, errno.ECONNRESET, errno.EPIPE): if e.errno not in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
raise raise
return None, True return None, True
@ -264,7 +267,7 @@ class Message(object):
def unpickle(self, throw=True): def unpickle(self, throw=True):
"""Deserialize `data` into an object.""" """Deserialize `data` into an object."""
IOLOG.debug('%r.unpickle()', self) _vv and IOLOG.debug('%r.unpickle()', self)
fp = cStringIO.StringIO(self.data) fp = cStringIO.StringIO(self.data)
unpickler = cPickle.Unpickler(fp) unpickler = cPickle.Unpickler(fp)
unpickler.find_global = self._find_global unpickler.find_global = self._find_global
@ -300,7 +303,7 @@ class Sender(object):
def close(self): def close(self):
"""Indicate this channel is closed to the remote side.""" """Indicate this channel is closed to the remote side."""
IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.context.send( self.context.send(
Message.pickled( Message.pickled(
_DEAD, _DEAD,
@ -310,7 +313,7 @@ class Sender(object):
def put(self, data): def put(self, data):
"""Send `data` to the remote.""" """Send `data` to the remote."""
IOLOG.debug('%r.put(%r..)', self, data[:100]) _vv and IOLOG.debug('%r.put(%r..)', self, data[:100])
self.context.send( self.context.send(
Message.pickled( Message.pickled(
data, data,
@ -335,7 +338,7 @@ class Receiver(object):
def _on_receive(self, msg): def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._on_receive(%r)', self, msg) _vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
self._latch.put(msg) self._latch.put(msg)
if self.notify: if self.notify:
self.notify(self) self.notify(self)
@ -347,7 +350,7 @@ class Receiver(object):
return self._latch.empty() return self._latch.empty()
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block) msg = self._latch.get(timeout=timeout, block=block)
#IOLOG.debug('%r.get() got %r', self, msg) #IOLOG.debug('%r.get() got %r', self, msg)
@ -419,22 +422,22 @@ class Importer(object):
fullname = fullname.rstrip('.') fullname = fullname.rstrip('.')
try: try:
pkgname, _, _ = fullname.rpartition('.') pkgname, _, _ = fullname.rpartition('.')
LOG.debug('%r.find_module(%r)', self, fullname) _v and LOG.debug('%r.find_module(%r)', self, fullname)
if fullname not in self._present.get(pkgname, (fullname,)): if fullname not in self._present.get(pkgname, (fullname,)):
LOG.debug('%r: master doesn\'t know %r', self, fullname) _v and LOG.debug('%r: master doesn\'t know %r', self, fullname)
return None return None
pkg = sys.modules.get(pkgname) pkg = sys.modules.get(pkgname)
if pkg and getattr(pkg, '__loader__', None) is not self: if pkg and getattr(pkg, '__loader__', None) is not self:
LOG.debug('%r: %r is submodule of a package we did not load', _v and LOG.debug('%r: %r is submodule of a package we did not load',
self, fullname) self, fullname)
return None return None
try: try:
__import__(fullname, {}, {}, ['']) __import__(fullname, {}, {}, [''])
LOG.debug('%r: %r is available locally', self, fullname) _v and LOG.debug('%r: %r is available locally', self, fullname)
except ImportError: except ImportError:
LOG.debug('find_module(%r) returning self', fullname) _v and LOG.debug('find_module(%r) returning self', fullname)
return self return self
finally: finally:
del _tls.running del _tls.running
@ -470,7 +473,7 @@ class Importer(object):
def _on_load_module(self, msg): def _on_load_module(self, msg):
tup = msg.unpickle() tup = msg.unpickle()
fullname = tup[0] fullname = tup[0]
LOG.debug('Importer._on_load_module(%r)', fullname) _v and LOG.debug('Importer._on_load_module(%r)', fullname)
self._lock.acquire() self._lock.acquire()
try: try:
@ -489,10 +492,10 @@ class Importer(object):
if not present: if not present:
funcs = self._callbacks.get(fullname) funcs = self._callbacks.get(fullname)
if funcs is not None: if funcs is not None:
LOG.debug('_request_module(%r): in flight', fullname) _v and LOG.debug('_request_module(%r): in flight', fullname)
funcs.append(callback) funcs.append(callback)
else: else:
LOG.debug('_request_module(%r): new request', fullname) _v and LOG.debug('_request_module(%r): new request', fullname)
self._callbacks[fullname] = [callback] self._callbacks[fullname] = [callback]
self._context.send(Message(data=fullname, handle=GET_MODULE)) self._context.send(Message(data=fullname, handle=GET_MODULE))
finally: finally:
@ -502,7 +505,7 @@ class Importer(object):
callback() callback()
def load_module(self, fullname): def load_module(self, fullname):
LOG.debug('Importer.load_module(%r)', fullname) _v and LOG.debug('Importer.load_module(%r)', fullname)
self._load_module_hacks(fullname) self._load_module_hacks(fullname)
event = threading.Event() event = threading.Event()
@ -573,7 +576,7 @@ class Side(object):
def close(self): def close(self):
if self.fd is not None: if self.fd is not None:
IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
os.close(self.fd) os.close(self.fd)
self.fd = None self.fd = None
@ -608,7 +611,7 @@ class BasicStream(object):
fire(self, 'disconnect') fire(self, 'disconnect')
def on_shutdown(self, broker): def on_shutdown(self, broker):
LOG.debug('%r.on_shutdown()', self) _v and LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown') fire(self, 'shutdown')
self.on_disconnect(broker) self.on_disconnect(broker)
@ -638,7 +641,7 @@ class Stream(BasicStream):
def on_receive(self, broker): def on_receive(self, broker):
"""Handle the next complete message on the stream. Raise """Handle the next complete message on the stream. Raise
:py:class:`StreamError` on failure.""" :py:class:`StreamError` on failure."""
IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
buf = self.receive_side.read() buf = self.receive_side.read()
if buf is None: if buf is None:
@ -669,7 +672,7 @@ class Stream(BasicStream):
) )
if (len(self._input_buf) - self.HEADER_LEN) < msg_len: if (len(self._input_buf) - self.HEADER_LEN) < msg_len:
IOLOG.debug('%r: Input too short (want %d, got %d)', _vv and IOLOG.debug('%r: Input too short (want %d, got %d)',
self, msg_len, len(self._input_buf) - self.HEADER_LEN) self, msg_len, len(self._input_buf) - self.HEADER_LEN)
return False return False
@ -680,25 +683,25 @@ class Stream(BasicStream):
def on_transmit(self, broker): def on_transmit(self, broker):
"""Transmit buffered messages.""" """Transmit buffered messages."""
IOLOG.debug('%r.on_transmit()', self) _vv and IOLOG.debug('%r.on_transmit()', self)
if self._output_buf: if self._output_buf:
buf = self._output_buf.popleft() buf = self._output_buf.popleft()
written = self.transmit_side.write(buf) written = self.transmit_side.write(buf)
if not written: if not written:
LOG.debug('%r.on_transmit(): disconnection detected', self) _v and LOG.debug('%r.on_transmit(): disconnection detected', self)
self.on_disconnect(broker) self.on_disconnect(broker)
return return
elif written != len(buf): elif written != len(buf):
self._output_buf.appendleft(buf[written:]) self._output_buf.appendleft(buf[written:])
IOLOG.debug('%r.on_transmit() -> len %d', self, written) _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
if not self._output_buf: if not self._output_buf:
broker.stop_transmit(self) broker.stop_transmit(self)
def _send(self, msg): def _send(self, msg):
IOLOG.debug('%r._send(%r)', self, msg) _vv and IOLOG.debug('%r._send(%r)', self, msg)
pkt = struct.pack('>hhhLLL', msg.dst_id, msg.src_id, msg.auth_id, pkt = struct.pack('>hhhLLL', msg.dst_id, msg.src_id, msg.auth_id,
msg.handle, msg.reply_to or 0, len(msg.data) msg.handle, msg.reply_to or 0, len(msg.data)
) + msg.data ) + msg.data
@ -716,7 +719,7 @@ class Stream(BasicStream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Override BasicStream behaviour of immediately disconnecting.""" """Override BasicStream behaviour of immediately disconnecting."""
LOG.debug('%r.on_shutdown(%r)', self, broker) _v and LOG.debug('%r.on_shutdown(%r)', self, broker)
def accept(self, rfd, wfd): def accept(self, rfd, wfd):
# TODO: what is this os.dup for? # TODO: what is this os.dup for?
@ -742,7 +745,7 @@ class Context(object):
return _unpickle_context, (self.context_id, self.name) return _unpickle_context, (self.context_id, self.name)
def on_disconnect(self, broker): def on_disconnect(self, broker):
LOG.debug('Parent stream is gone, dying.') _v and LOG.debug('Parent stream is gone, dying.')
fire(self, 'disconnect') fire(self, 'disconnect')
broker.shutdown() broker.shutdown()
@ -762,7 +765,7 @@ class Context(object):
receiver = Receiver(self.router, persist=persist, respondent=self) receiver = Receiver(self.router, persist=persist, respondent=self)
msg.reply_to = receiver.handle msg.reply_to = receiver.handle
LOG.debug('%r.send_async(%r)', self, msg) _v and LOG.debug('%r.send_async(%r)', self, msg)
self.send(msg) self.send(msg)
return receiver return receiver
@ -771,7 +774,7 @@ class Context(object):
receiver = self.send_async(msg) receiver = self.send_async(msg)
response = receiver.get(deadline) response = receiver.get(deadline)
data = response.unpickle() data = response.unpickle()
IOLOG.debug('%r._send_await() -> %r', self, data) _vv and IOLOG.debug('%r._send_await() -> %r', self, data)
return data return data
def __repr__(self): def __repr__(self):
@ -829,7 +832,7 @@ class Latch(object):
self.lock.release() self.lock.release()
def put(self, obj): def put(self, obj):
IOLOG.debug('%r.put(%r)', self, obj) _vv and IOLOG.debug('%r.put(%r)', self, obj)
self.lock.acquire() self.lock.acquire()
try: try:
self.queue.append(obj) self.queue.append(obj)
@ -838,7 +841,7 @@ class Latch(object):
self._wake(self.wake_socks.pop(0)) self._wake(self.wake_socks.pop(0))
finally: finally:
self.lock.release() self.lock.release()
LOG.debug('put() done. woken? %s', woken) _v and LOG.debug('put() done. woken? %s', woken)
def _wake(self, sock): def _wake(self, sock):
try: try:
@ -878,7 +881,7 @@ class Waker(BasicStream):
Write a byte to the self-pipe, causing the IO multiplexer to wake up. Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread. Nothing is written if the current thread is the IO multiplexer thread.
""" """
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd) _vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread: if threading.currentThread() != self._broker._thread:
try: try:
self.transmit_side.write(' ') self.transmit_side.write(' ')
@ -918,13 +921,13 @@ class IoLogger(BasicStream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Shut down the write end of the logging socket.""" """Shut down the write end of the logging socket."""
LOG.debug('%r.on_shutdown()', self) _v and LOG.debug('%r.on_shutdown()', self)
self._wsock.shutdown(socket.SHUT_WR) self._wsock.shutdown(socket.SHUT_WR)
self._wsock.close() self._wsock.close()
self.transmit_side.close() self.transmit_side.close()
def on_receive(self, broker): def on_receive(self, broker):
IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
buf = os.read(self.receive_side.fd, CHUNK_SIZE) buf = os.read(self.receive_side.fd, CHUNK_SIZE)
if not buf: if not buf:
return self.on_disconnect(broker) return self.on_disconnect(broker)
@ -940,6 +943,11 @@ class Router(object):
self.broker = broker self.broker = broker
listen(broker, 'shutdown', self.on_broker_shutdown) listen(broker, 'shutdown', self.on_broker_shutdown)
# Here seems as good a place as any.
global _v, _vv
_v = logging.getLogger().level <= logging.DEBUG
_vv = IOLOG.level <= logging.DEBUG
#: context ID -> Stream #: context ID -> Stream
self._stream_by_id = {} self._stream_by_id = {}
#: List of contexts to notify of shutdown. #: List of contexts to notify of shutdown.
@ -970,7 +978,7 @@ class Router(object):
context.on_shutdown(self.broker) context.on_shutdown(self.broker)
def add_route(self, target_id, via_id): def add_route(self, target_id, via_id):
LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) _v and LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id)
try: try:
self._stream_by_id[target_id] = self._stream_by_id[via_id] self._stream_by_id[target_id] = self._stream_by_id[via_id]
except KeyError: except KeyError:
@ -983,14 +991,14 @@ class Router(object):
self.add_route(target_id, via_id) self.add_route(target_id, via_id)
def register(self, context, stream): def register(self, context, stream):
LOG.debug('register(%r, %r)', context, stream) _v and LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream self._stream_by_id[context.context_id] = stream
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
self.broker.start_receive(stream) self.broker.start_receive(stream)
def add_handler(self, fn, handle=None, persist=True, respondent=None): def add_handler(self, fn, handle=None, persist=True, respondent=None):
handle = handle or self._last_handle.next() handle = handle or self._last_handle.next()
IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn self._handle_map[handle] = persist, fn
if respondent: if respondent:
@ -1005,10 +1013,10 @@ class Router(object):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks """Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with :py:meth:`add_handle_cb` the connection is dead.""" registered with :py:meth:`add_handle_cb` the connection is dead."""
LOG.debug('%r.on_shutdown(%r)', self, broker) _v and LOG.debug('%r.on_shutdown(%r)', self, broker)
fire(self, 'shutdown') fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD) fn(_DEAD)
def _invoke(self, msg): def _invoke(self, msg):
@ -1028,7 +1036,7 @@ class Router(object):
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def _async_route(self, msg, stream=None): def _async_route(self, msg, stream=None):
IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream) _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream)
# Perform source verification. # Perform source verification.
if stream is not None: if stream is not None:
expected_stream = self._stream_by_id.get(msg.auth_id, expected_stream = self._stream_by_id.get(msg.auth_id,
@ -1094,7 +1102,7 @@ class Broker(object):
lst.append(value) lst.append(value)
def start_receive(self, stream): def start_receive(self, stream):
IOLOG.debug('%r.start_receive(%r)', self, stream) _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
assert stream.receive_side and stream.receive_side.fd is not None assert stream.receive_side and stream.receive_side.fd is not None
self.defer(self._list_add, self._readers, stream.receive_side) self.defer(self._list_add, self._readers, stream.receive_side)
@ -1129,7 +1137,7 @@ class Broker(object):
self.shutdown() self.shutdown()
def _loop_once(self, timeout=None): def _loop_once(self, timeout=None):
IOLOG.debug('%r._loop_once(%r)', self, timeout) _vv and IOLOG.debug('%r._loop_once(%r)', self, timeout)
self._run_defer() self._run_defer()
#IOLOG.debug('readers = %r', self._readers) #IOLOG.debug('readers = %r', self._readers)
@ -1137,11 +1145,11 @@ class Broker(object):
rsides, wsides, _ = select.select(self._readers, self._writers, rsides, wsides, _ = select.select(self._readers, self._writers,
(), timeout) (), timeout)
for side in rsides: for side in rsides:
IOLOG.debug('%r: POLLIN for %r', self, side) _vv and IOLOG.debug('%r: POLLIN for %r', self, side)
self._call(side.stream, side.stream.on_receive) self._call(side.stream, side.stream.on_receive)
for side in wsides: for side in wsides:
IOLOG.debug('%r: POLLOUT for %r', self, side) _vv and IOLOG.debug('%r: POLLOUT for %r', self, side)
self._call(side.stream, side.stream.on_transmit) self._call(side.stream, side.stream.on_transmit)
def keep_alive(self): def keep_alive(self):
@ -1178,7 +1186,7 @@ class Broker(object):
fire(self, 'exit') fire(self, 'exit')
def shutdown(self): def shutdown(self):
LOG.debug('%r.shutdown()', self) _v and LOG.debug('%r.shutdown()', self)
self._alive = False self._alive = False
self._waker.wake() self._waker.wake()
@ -1198,7 +1206,7 @@ class ExternalContext(object):
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
LOG.debug('_on_shutdown_msg(%r)', msg) _v and LOG.debug('_on_shutdown_msg(%r)', msg)
if msg.src_id != mitogen.parent_id: if msg.src_id != mitogen.parent_id:
LOG.warning('Ignoring SHUTDOWN from non-parent: %r', msg) LOG.warning('Ignoring SHUTDOWN from non-parent: %r', msg)
return return
@ -1286,7 +1294,7 @@ class ExternalContext(object):
def _dispatch_calls(self): def _dispatch_calls(self):
for msg in self.channel: for msg in self.channel:
data = msg.unpickle(throw=False) data = msg.unpickle(throw=False)
LOG.debug('_dispatch_calls(%r)', data) _v and LOG.debug('_dispatch_calls(%r)', data)
if msg.auth_id not in mitogen.parent_ids: if msg.auth_id not in mitogen.parent_ids:
LOG.warning('CALL_FUNCTION from non-parent %r', msg.auth_id) LOG.warning('CALL_FUNCTION from non-parent %r', msg.auth_id)
@ -1302,7 +1310,7 @@ class ExternalContext(object):
kwargs.setdefault('router', self.router) kwargs.setdefault('router', self.router)
msg.reply(fn(*args, **kwargs)) msg.reply(fn(*args, **kwargs))
except Exception, e: except Exception, e:
LOG.debug('_dispatch_calls: %s', e) _v and LOG.debug('_dispatch_calls: %s', e)
msg.reply(CallError(e)) msg.reply(CallError(e))
self.dispatch_stopped = True self.dispatch_stopped = True
@ -1320,12 +1328,12 @@ class ExternalContext(object):
self.router.register(self.parent, self.stream) self.router.register(self.parent, self.stream)
sys.executable = os.environ.pop('ARGV0', sys.executable) sys.executable = os.environ.pop('ARGV0', sys.executable)
LOG.debug('Connected to %s; my ID is %r, PID is %r', _v and LOG.debug('Connected to %s; my ID is %r, PID is %r',
self.parent, context_id, os.getpid()) self.parent, context_id, os.getpid())
LOG.debug('Recovered sys.executable: %r', sys.executable) _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
_profile_hook('main', self._dispatch_calls) _profile_hook('main', self._dispatch_calls)
LOG.debug('ExternalContext.main() normal exit') _v and LOG.debug('ExternalContext.main() normal exit')
except BaseException: except BaseException:
LOG.exception('ExternalContext.main() crashed') LOG.exception('ExternalContext.main() crashed')
raise raise

Loading…
Cancel
Save