|
|
|
@ -580,7 +580,7 @@ def write_all(fd, s, deadline=None):
|
|
|
|
|
|
|
|
|
|
class Timer(object):
|
|
|
|
|
"""
|
|
|
|
|
Represents an unexpired timed callback.
|
|
|
|
|
Represents a future event.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, timer_list, when, func):
|
|
|
|
|
self.timer_list = timer_list
|
|
|
|
@ -595,12 +595,32 @@ class Timer(object):
|
|
|
|
|
return self.when < other.when
|
|
|
|
|
|
|
|
|
|
def cancel(self):
|
|
|
|
|
"""
|
|
|
|
|
Cancel this event. If it has not yet executed, it will not execute
|
|
|
|
|
during any subsequent :meth:`TimerList.expire` call.
|
|
|
|
|
"""
|
|
|
|
|
self.cancelled = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TimerList(object):
|
|
|
|
|
"""
|
|
|
|
|
Represent a series of future events.
|
|
|
|
|
Efficiently manage a list of cancellable future events relative to wall
|
|
|
|
|
clock time. An instance of this class is installed as
|
|
|
|
|
:attr:`mitogen.master.Broker.timers` by default, and installed as
|
|
|
|
|
:attr:`mitogen.core.Broker.timers` in a child context after a call to
|
|
|
|
|
:func:`mitogen.parent.upgrade_router`.
|
|
|
|
|
|
|
|
|
|
You can use :class:`TimerList` to cause the broker to wake at arbitrary
|
|
|
|
|
future moments, useful for implementing timeouts and polling in an
|
|
|
|
|
asynchronous context.
|
|
|
|
|
|
|
|
|
|
:class:`TimerList` methods can only be called from asynchronous context,
|
|
|
|
|
for example via :meth:`mitogen.core.Broker.defer`.
|
|
|
|
|
|
|
|
|
|
The broker automatically adjusts its sleep delay according to the installed
|
|
|
|
|
timer list, and arranges for timers to expire via automatic calls to
|
|
|
|
|
:meth:`expire`. The main user interface to :class:`TimerList` is
|
|
|
|
|
:meth:`schedule`.
|
|
|
|
|
"""
|
|
|
|
|
_now = time.time
|
|
|
|
|
|
|
|
|
@ -608,15 +628,36 @@ class TimerList(object):
|
|
|
|
|
self._lst = []
|
|
|
|
|
|
|
|
|
|
def get_timeout(self):
|
|
|
|
|
"""
|
|
|
|
|
Return the floating point seconds until the next event is due.
|
|
|
|
|
|
|
|
|
|
:returns:
|
|
|
|
|
Floating point delay, or 0.0, or :data:`None` if no events are
|
|
|
|
|
scheduled.
|
|
|
|
|
"""
|
|
|
|
|
if self._lst:
|
|
|
|
|
return max(0, self._lst[0].when - self._now())
|
|
|
|
|
|
|
|
|
|
def schedule(self, when, func):
|
|
|
|
|
"""
|
|
|
|
|
Schedule a new future event.
|
|
|
|
|
|
|
|
|
|
:param float when:
|
|
|
|
|
UNIX time in seconds when event should occur.
|
|
|
|
|
:param callable func:
|
|
|
|
|
Callable to invoke on expiry.
|
|
|
|
|
:returns:
|
|
|
|
|
A :class:`Timer` instance, exposing :meth:`Timer.cancel`, which may
|
|
|
|
|
be used to cancel the future invocation.
|
|
|
|
|
"""
|
|
|
|
|
timer = Timer(self, when, func)
|
|
|
|
|
heapq.heappush(self._lst, timer)
|
|
|
|
|
return timer
|
|
|
|
|
|
|
|
|
|
def expire(self):
|
|
|
|
|
"""
|
|
|
|
|
Invoke callbacks for any events in the past.
|
|
|
|
|
"""
|
|
|
|
|
now = self._now()
|
|
|
|
|
while self._lst and self._lst[0].when <= now:
|
|
|
|
|
timer = heapq.heappop(self._lst)
|
|
|
|
@ -786,7 +827,7 @@ def _upgrade_broker(broker):
|
|
|
|
|
finally:
|
|
|
|
|
root.setLevel(old_level)
|
|
|
|
|
|
|
|
|
|
broker.timer_list = TimerList()
|
|
|
|
|
broker.timers = TimerList()
|
|
|
|
|
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
|
|
|
|
|
'old: %d readers, %d writers)', old, new,
|
|
|
|
|
len(new.readers), len(new.writers),
|
|
|
|
|