Add mitogen.core.now() and use it everywhere; closes #614.

pull/618/head
David Wilson 5 years ago
parent 379dca90b9
commit 57012e0f72

@ -603,6 +603,14 @@ Fork Safety
Utility Functions Utility Functions
================= =================
.. currentmodule:: mitogen.core
.. function:: now
A reference to :func:`time.time` on Python 2, or :func:`time.monotonic` on
Python >3.3. We prefer :func:`time.monotonic` when available to ensure
timers are not impacted by system clock changes.
.. module:: mitogen.utils .. module:: mitogen.utils
A random assortment of utility functions useful on masters and children. A random assortment of utility functions useful on masters and children.

@ -362,6 +362,10 @@ def to_text(o):
return UnicodeType(o) return UnicodeType(o)
# Documented in api.rst to work around Sphinx limitation.
now = getattr(time, 'monotonic', time.time)
# Python 2.4 # Python 2.4
try: try:
any any
@ -636,7 +640,7 @@ def _real_profile_hook(name, func, *args):
return func(*args) return func(*args)
finally: finally:
path = _profile_fmt % { path = _profile_fmt % {
'now': int(1e6 * time.time()), 'now': int(1e6 * now()),
'identity': name, 'identity': name,
'pid': os.getpid(), 'pid': os.getpid(),
'ext': '%s' 'ext': '%s'
@ -3482,9 +3486,9 @@ class Broker(object):
for _, (side, _) in self.poller.readers + self.poller.writers: for _, (side, _) in self.poller.readers + self.poller.writers:
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
deadline = time.time() + self.shutdown_timeout deadline = now() + self.shutdown_timeout
while self.keep_alive() and time.time() < deadline: while self.keep_alive() and now() < deadline:
self._loop_once(max(0, deadline - time.time())) self._loop_once(max(0, deadline - now()))
if self.keep_alive(): if self.keep_alive():
LOG.error('%r: pending work still existed %d seconds after ' LOG.error('%r: pending work still existed %d seconds after '

@ -910,9 +910,9 @@ class ModuleResponder(object):
if self.minify_safe_re.search(source): if self.minify_safe_re.search(source):
# If the module contains a magic marker, it's safe to minify. # If the module contains a magic marker, it's safe to minify.
t0 = time.time() t0 = mitogen.core.now()
source = mitogen.minify.minimize_source(source).encode('utf-8') source = mitogen.minify.minimize_source(source).encode('utf-8')
self.minify_secs += time.time() - t0 self.minify_secs += mitogen.core.now() - t0
if is_pkg: if is_pkg:
pkg_present = get_child_modules(path) pkg_present = get_child_modules(path)
@ -1001,11 +1001,11 @@ class ModuleResponder(object):
LOG.warning('_on_get_module(): dup request for %r from %r', LOG.warning('_on_get_module(): dup request for %r from %r',
fullname, stream) fullname, stream)
t0 = time.time() t0 = mitogen.core.now()
try: try:
self._send_module_and_related(stream, fullname) self._send_module_and_related(stream, fullname)
finally: finally:
self.get_module_secs += time.time() - t0 self.get_module_secs += mitogen.core.now() - t0
def _send_forward_module(self, stream, context, fullname): def _send_forward_module(self, stream, context, fullname):
if stream.protocol.remote_id != context.context_id: if stream.protocol.remote_id != context.context_id:

@ -633,7 +633,7 @@ class TimerList(object):
:meth:`expire`. The main user interface to :class:`TimerList` is :meth:`expire`. The main user interface to :class:`TimerList` is
:meth:`schedule`. :meth:`schedule`.
""" """
_now = time.time _now = mitogen.core.now
def __init__(self): def __init__(self):
self._lst = [] self._lst = []
@ -1124,7 +1124,7 @@ class LineLoggingProtocolMixin(object):
def on_line_received(self, line): def on_line_received(self, line):
self.logged_partial = None self.logged_partial = None
self.logged_lines.append((time.time(), line)) self.logged_lines.append((mitogen.core.now(), line))
self.logged_lines[:] = self.logged_lines[-100:] self.logged_lines[:] = self.logged_lines[-100:]
return super(LineLoggingProtocolMixin, self).on_line_received(line) return super(LineLoggingProtocolMixin, self).on_line_received(line)
@ -1134,7 +1134,7 @@ class LineLoggingProtocolMixin(object):
def on_disconnect(self, broker): def on_disconnect(self, broker):
if self.logged_partial: if self.logged_partial:
self.logged_lines.append((time.time(), self.logged_partial)) self.logged_lines.append((mitogen.core.now(), self.logged_partial))
self.logged_partial = None self.logged_partial = None
super(LineLoggingProtocolMixin, self).on_disconnect(broker) super(LineLoggingProtocolMixin, self).on_disconnect(broker)
@ -1324,7 +1324,7 @@ class Options(object):
self.profiling = profiling self.profiling = profiling
self.unidirectional = unidirectional self.unidirectional = unidirectional
self.max_message_size = max_message_size self.max_message_size = max_message_size
self.connect_deadline = time.time() + self.connect_timeout self.connect_deadline = mitogen.core.now() + self.connect_timeout
class Connection(object): class Connection(object):
@ -1819,7 +1819,7 @@ class CallChain(object):
socket.gethostname(), socket.gethostname(),
os.getpid(), os.getpid(),
thread.get_ident(), thread.get_ident(),
int(1e6 * time.time()), int(1e6 * mitogen.core.now()),
) )
def __repr__(self): def __repr__(self):
@ -2569,7 +2569,7 @@ class Reaper(object):
def _install_timer(self, delay): def _install_timer(self, delay):
new = self._timer is None new = self._timer is None
self._timer = self.broker.timers.schedule( self._timer = self.broker.timers.schedule(
when=time.time() + delay, when=mitogen.core.now() + delay,
func=self.reap, func=self.reap,
) )
if new: if new:

@ -1109,7 +1109,7 @@ class FileService(Service):
:meth:`fetch`. :meth:`fetch`.
""" """
LOG.debug('get_file(): fetching %r from %r', path, context) LOG.debug('get_file(): fetching %r from %r', path, context)
t0 = time.time() t0 = mitogen.core.now()
recv = mitogen.core.Receiver(router=context.router) recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service( metadata = context.call_service(
service_name=cls.name(), service_name=cls.name(),
@ -1143,5 +1143,6 @@ class FileService(Service):
path, metadata['size'], received_bytes) path, metadata['size'], received_bytes)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
metadata['size'], path, context, 1000 * (time.time() - t0)) metadata['size'], path, context,
1000 * (mitogen.core.now() - t0))
return ok, metadata return ok, metadata

@ -3,13 +3,13 @@ Measure latency of .fork() setup/teardown.
""" """
import mitogen import mitogen
import time import mitogen.core
@mitogen.main() @mitogen.main()
def main(router): def main(router):
t0 = time.time() t0 = mitogen.core.now()
for x in xrange(200): for x in xrange(200):
t = time.time() t = mitogen.core.now()
ctx = router.fork() ctx = router.fork()
ctx.shutdown(wait=True) ctx.shutdown(wait=True)
print '++', 1000 * ((time.time() - t0) / (1.0+x)) print '++', 1000 * ((mitogen.core.now() - t0) / (1.0+x))

@ -4,7 +4,9 @@
import subprocess import subprocess
import time import time
import socket import socket
import mitogen import mitogen
import mitogen.core
@mitogen.main() @mitogen.main()
@ -15,12 +17,12 @@ def main(router):
s = ' ' * n s = ' ' * n
print('bytes in %.2fMiB string...' % (n/1048576.0),) print('bytes in %.2fMiB string...' % (n/1048576.0),)
t0 = time.time() t0 = mitogen.core.now()
for x in range(10): for x in range(10):
tt0 = time.time() tt0 = mitogen.core.now()
assert n == c.call(len, s) assert n == c.call(len, s)
print('took %dms' % (1000 * (time.time() - tt0),)) print('took %dms' % (1000 * (mitogen.core.now() - tt0),))
t1 = time.time() t1 = mitogen.core.now()
print('total %dms / %dms avg / %.2fMiB/sec' % ( print('total %dms / %dms avg / %.2fMiB/sec' % (
1000 * (t1 - t0), 1000 * (t1 - t0),
(1000 * (t1 - t0)) / (x + 1), (1000 * (t1 - t0)) / (x + 1),

@ -6,6 +6,7 @@ import threading
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -33,8 +34,8 @@ t2.start()
ready.get() ready.get()
ready.get() ready.get()
t0 = time.time() t0 = mitogen.core.now()
l1.put(None) l1.put(None)
t1.join() t1.join()
t2.join() t2.join()
print('++', int(1e6 * ((time.time() - t0) / (1.0+X))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+X))), 'usec')

@ -5,6 +5,7 @@ Measure latency of .local() setup.
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -15,10 +16,10 @@ mitogen.utils.setup_gil()
@mitogen.main() @mitogen.main()
def main(router): def main(router):
t0=time.time() t0 = mitogen.core.now()
for x in range(100): for x in range(100):
t = time.time() t = mitogen.core.now()
f = router.local()# debug=True) f = router.local()# debug=True)
tt = time.time() tt = mitogen.core.now()
print(x, 1000 * (tt - t)) print(x, 1000 * (tt - t))
print('%.03f ms' % (1000 * (time.time() - t0) / (1.0 + x))) print('%.03f ms' % (1000 * (mitogen.core.now() - t0) / (1.0 + x)))

@ -4,12 +4,14 @@ import sys
import os import os
import time import time
import mitogen.core
times = [] times = []
for x in range(5): for x in range(5):
t0 = time.time() t0 = mitogen.core.now()
os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:]) os.spawnvp(os.P_WAIT, sys.argv[1], sys.argv[1:])
t = time.time() - t0 t = mitogen.core.now() - t0
times.append(t) times.append(t)
print('+++', t) print('+++', t)

@ -5,6 +5,7 @@ Measure latency of local RPC.
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -23,7 +24,7 @@ def do_nothing():
def main(router): def main(router):
f = router.fork() f = router.fork()
f.call(do_nothing) f.call(do_nothing)
t0 = time.time() t0 = mitogen.core.now()
for x in xrange(20000): for x in xrange(20000):
f.call(do_nothing) f.call(do_nothing)
print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec')

@ -4,8 +4,9 @@ Measure latency of local service RPC.
import time import time
import mitogen.service
import mitogen import mitogen
import mitogen.core
import mitogen.service
class MyService(mitogen.service.Service): class MyService(mitogen.service.Service):
@ -17,7 +18,7 @@ class MyService(mitogen.service.Service):
@mitogen.main() @mitogen.main()
def main(router): def main(router):
f = router.fork() f = router.fork()
t0 = time.time() t0 = mitogen.core.now()
for x in range(1000): for x in range(1000):
f.call_service(service_name=MyService, method_name='ping') f.call_service(service_name=MyService, method_name='ping')
print('++', int(1e6 * ((time.time() - t0) / (1.0+x))), 'usec') print('++', int(1e6 * ((mitogen.core.now() - t0) / (1.0+x))), 'usec')

@ -6,6 +6,7 @@ import sys
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.utils import mitogen.utils
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -24,12 +25,12 @@ def do_nothing():
def main(router): def main(router):
f = router.ssh(hostname=sys.argv[1]) f = router.ssh(hostname=sys.argv[1])
f.call(do_nothing) f.call(do_nothing)
t0 = time.time() t0 = mitogen.core.now()
end = time.time() + 5.0 end = mitogen.core.now() + 5.0
i = 0 i = 0
while time.time() < end: while mitogen.core.now() < end:
f.call(do_nothing) f.call(do_nothing)
i += 1 i += 1
t1 = time.time() t1 = mitogen.core.now()
print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms') print('++', float(1e3 * (t1 - t0) / (1.0+i)), 'ms')

@ -8,6 +8,7 @@ import tempfile
import time import time
import mitogen import mitogen
import mitogen.core
import mitogen.service import mitogen.service
import ansible_mitogen.affinity import ansible_mitogen.affinity
@ -35,9 +36,9 @@ def run_test(router, fp, s, context):
size = fp.tell() size = fp.tell()
print('Testing %s...' % (s,)) print('Testing %s...' % (s,))
context.call(prepare) context.call(prepare)
t0 = time.time() t0 = mitogen.core.now()
context.call(transfer, router.myself(), fp.name) context.call(transfer, router.myself(), fp.name)
t1 = time.time() t1 = mitogen.core.now()
print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % ( print('%s took %.2f ms to transfer %.2f MiB, %.2f MiB/s' % (
s, 1000 * (t1 - t0), size / 1048576.0, s, 1000 * (t1 - t0), size / 1048576.0,
(size / (t1 - t0) / 1048576.0), (size / (t1 - t0) / 1048576.0),

@ -9,6 +9,7 @@ import tempfile
import mock import mock
import unittest2 import unittest2
import mitogen.core
import mitogen.parent import mitogen.parent
from mitogen.core import b from mitogen.core import b
@ -188,7 +189,6 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
deadline = time.time() + 5.0
mitogen.core.set_block(proc.stdin.fileno()) mitogen.core.set_block(proc.stdin.fileno())
# read(3) below due to https://bugs.python.org/issue37696 # read(3) below due to https://bugs.python.org/issue37696
self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3)) self.assertEquals(mitogen.core.b('hi\n'), proc.stdin.read(3))
@ -271,7 +271,6 @@ class TtyCreateChildTest(testlib.TestCase):
proc = self.func([ proc = self.func([
'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,) 'bash', '-c', 'exec 2>%s; echo hi > /dev/tty' % (tf.name,)
]) ])
deadline = time.time() + 5.0
self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3)) self.assertEquals(mitogen.core.b('hi\n'), wait_read(proc.stdout, 3))
waited_pid, status = os.waitpid(proc.pid, 0) waited_pid, status = os.waitpid(proc.pid, 0)
self.assertEquals(proc.pid, waited_pid) self.assertEquals(proc.pid, waited_pid)

@ -12,6 +12,7 @@ import unittest2
import testlib import testlib
from testlib import Popen__terminate from testlib import Popen__terminate
import mitogen.core
import mitogen.parent import mitogen.parent
try: try:
@ -21,8 +22,8 @@ except NameError:
def wait_for_child(pid, timeout=1.0): def wait_for_child(pid, timeout=1.0):
deadline = time.time() + timeout deadline = mitogen.core.now() + timeout
while timeout < time.time(): while timeout < mitogen.core.now():
try: try:
target_pid, status = os.waitpid(pid, os.WNOHANG) target_pid, status = os.waitpid(pid, os.WNOHANG)
if target_pid == pid: if target_pid == pid:

@ -164,14 +164,14 @@ class CloseMixin(PollerMixin):
class PollMixin(PollerMixin): class PollMixin(PollerMixin):
def test_empty_zero_timeout(self): def test_empty_zero_timeout(self):
t0 = time.time() t0 = mitogen.core.now()
self.assertEquals([], list(self.p.poll(0))) self.assertEquals([], list(self.p.poll(0)))
self.assertTrue((time.time() - t0) < .1) # vaguely reasonable self.assertTrue((mitogen.core.now() - t0) < .1) # vaguely reasonable
def test_empty_small_timeout(self): def test_empty_small_timeout(self):
t0 = time.time() t0 = mitogen.core.now()
self.assertEquals([], list(self.p.poll(.2))) self.assertEquals([], list(self.p.poll(.2)))
self.assertTrue((time.time() - t0) >= .2) self.assertTrue((mitogen.core.now() - t0) >= .2)
class ReadableMixin(PollerMixin, SockMixin): class ReadableMixin(PollerMixin, SockMixin):

@ -107,11 +107,11 @@ def wait_for_port(
If a regex pattern is supplied try to find it in the initial data. If a regex pattern is supplied try to find it in the initial data.
Return None on success, or raise on error. Return None on success, or raise on error.
""" """
start = time.time() start = mitogen.core.now()
end = start + overall_timeout end = start + overall_timeout
addr = (host, port) addr = (host, port)
while time.time() < end: while mitogen.core.now() < end:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(connect_timeout) sock.settimeout(connect_timeout)
try: try:
@ -130,7 +130,7 @@ def wait_for_port(
sock.settimeout(receive_timeout) sock.settimeout(receive_timeout)
data = mitogen.core.b('') data = mitogen.core.b('')
found = False found = False
while time.time() < end: while mitogen.core.now() < end:
try: try:
resp = sock.recv(1024) resp = sock.recv(1024)
except socket.timeout: except socket.timeout:

@ -162,7 +162,7 @@ def do_timer_test_econtext(econtext):
def do_timer_test(broker): def do_timer_test(broker):
now = time.time() now = mitogen.core.now()
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
broker.defer(lambda: broker.defer(lambda:
broker.timers.schedule( broker.timers.schedule(
@ -172,7 +172,7 @@ def do_timer_test(broker):
) )
assert 'hi' == latch.get() assert 'hi' == latch.get()
assert time.time() > (now + 0.250) assert mitogen.core.now() > (now + 0.250)
class BrokerTimerTest(testlib.TestCase): class BrokerTimerTest(testlib.TestCase):

@ -86,12 +86,12 @@ class ClientTest(testlib.TestCase):
def _try_connect(self, path): def _try_connect(self, path):
# give server a chance to setup listener # give server a chance to setup listener
timeout = time.time() + 30.0 timeout = mitogen.core.now() + 30.0
while True: while True:
try: try:
return mitogen.unix.connect(path) return mitogen.unix.connect(path)
except mitogen.unix.ConnectError: except mitogen.unix.ConnectError:
if time.time() > timeout: if mitogen.core.now() > timeout:
raise raise
time.sleep(0.1) time.sleep(0.1)

Loading…
Cancel
Save