Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  ci: fix incorrect partition/rpartition from 8a4caea84f
  issue #260: hide force-disconnect messages.
  issue #498: fix shutdown crash
  issue #260: avoid start_transmit()/on_transmit()/stop_transmit()
  core: ensure broker profiling output reaches disk
  master: keep is_stdlib_path() result as negative cache entry
  ci: Allow DISTROS="debian*32" variable, and KEEP=1
issue510
David Wilson 5 years ago
commit d763570ca2

@ -158,23 +158,37 @@ def make_containers():
firstbit = lambda s: (s+'-').split('-')[0]
secondbit = lambda s: (s+'-').split('-')[1]
return [
{
"distro": firstbit(distro),
"name": "target-%s-%s" % (distro, i),
"hostname": docker_hostname,
"port": BASE_PORT + i,
"python_path": (
'/usr/bin/python3'
if secondbit(distro) == 'py3'
else '/usr/bin/python'
)
}
for i, distro in enumerate(DISTROS, 1)
]
i = 1
lst = []
for distro in DISTROS:
distro, star, count = distro.partition('*')
if star:
count = int(count)
else:
count = 1
for x in range(count):
lst.append({
"distro": firstbit(distro),
"name": "target-%s-%s" % (distro, i),
"hostname": docker_hostname,
"port": BASE_PORT + i,
"python_path": (
'/usr/bin/python3'
if secondbit(distro) == 'py3'
else '/usr/bin/python'
)
})
i += 1
return lst
def start_containers(containers):
if os.environ.get('KEEP'):
return
run_batches([
[
"docker rm -f %(name)s || true" % container,

@ -540,7 +540,7 @@ def enable_profiling():
try:
return func(*args)
finally:
profiler.dump_stats('/tmp/mitogen.%d.%s.pstat' % (os.getpid(), name))
profiler.dump_stats('/tmp/mitogen.stats.%d.%s.pstat' % (os.getpid(), name))
profiler.create_stats()
fp = open('/tmp/mitogen.stats.%d.%s.log' % (os.getpid(), name), 'w')
try:
@ -1682,6 +1682,22 @@ class Stream(BasicStream):
pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id,
msg.src_id, msg.auth_id, msg.handle,
msg.reply_to or 0, len(msg.data)) + msg.data
if not self._output_buf_len:
# Modifying epoll/Kqueue state is expensive, as is needless broker
# loop iterations. Rather than wait for writeability, simply
# attempt to write immediately, and only fall back to
# start_transmit()/on_transmit() if an error occurred or the socket
# buffer was full.
try:
n = self.transmit_side.write(pkt)
if n:
if n == len(pkt):
return
pkt = pkt[n:]
except OSError:
pass
if not self._output_buf_len:
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
@ -2457,7 +2473,8 @@ class Router(object):
return
target_id_s, _, name = bytes_partition(msg.data, b(':'))
context = self._context_by_id.get(int(target_id_s, 10))
target_id = int(target_id_s, 10)
context = self._context_by_id.get(target_id)
if context:
fire(context, 'disconnect')
else:
@ -2798,8 +2815,7 @@ class Broker(object):
(self._waker.receive_side, self._waker.on_receive)
)
self._thread = threading.Thread(
target=_profile_hook,
args=('broker', self._broker_main),
target=self._broker_main,
name='mitogen-broker'
)
self._thread.start()
@ -2907,7 +2923,7 @@ class Broker(object):
to shut down gracefully, then discard the :class:`Poller`.
"""
for _, (side, _) in self.poller.readers + self.poller.writers:
LOG.error('_broker_main() force disconnecting %r', side)
LOG.debug('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self)
self.poller.close()
@ -2932,7 +2948,7 @@ class Broker(object):
'more child processes still connected to '
'our stdout/stderr pipes.', self)
def _broker_main(self):
def _do_broker_main(self):
"""
Broker thread main function. Dispatches IO events until
:meth:`shutdown` is called.
@ -2950,6 +2966,9 @@ class Broker(object):
self._exitted = True
self._broker_exit()
def _broker_main(self):
_profile_hook('mitogen-broker', self._do_broker_main)
fire(self, 'exit')
def shutdown(self):

@ -768,13 +768,22 @@ class ModuleResponder(object):
return (fullname, None, None, None, ())
def _build_tuple(self, fullname):
if mitogen.core.is_blacklisted_import(self, fullname):
raise ImportError('blacklisted')
if fullname in self._cache:
return self._cache[fullname]
if mitogen.core.is_blacklisted_import(self, fullname):
raise ImportError('blacklisted')
path, source, is_pkg = self._finder.get_module_source(fullname)
if path and is_stdlib_path(path):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.debug('%r: refusing to serve stdlib module %r',
self, fullname)
tup = self._make_negative_response(fullname)
self._cache[fullname] = tup
return tup
if source is None:
# TODO: make this .warning() or similar again once importer has its
# own logging category.
@ -839,14 +848,6 @@ class ModuleResponder(object):
def _send_module_and_related(self, stream, fullname):
try:
tup = self._build_tuple(fullname)
if tup[2] and is_stdlib_path(tup[2]):
# Prevent loading of 2.x<->3.x stdlib modules! This costs one
# RTT per hit, so a client-side solution is also required.
LOG.debug('%r: refusing to serve stdlib module %r',
self, fullname)
self._send_module_load_failed(stream, fullname)
return
for name in tup[4]: # related
parent, _, _ = str_partition(name, '.')
if parent != fullname and parent not in stream.sent_modules:
@ -893,8 +894,8 @@ class ModuleResponder(object):
path.append(fullname)
fullname, _, _ = str_rpartition(fullname, u'.')
stream = self._router.stream_by_id(context.context_id)
for fullname in reversed(path):
stream = self._router.stream_by_id(context.context_id)
self._send_module_and_related(stream, fullname)
self._send_forward_module(stream, context, fullname)

Loading…
Cancel
Save