issue #170: implement timers.

pull/607/head
David Wilson 6 years ago
parent 4fedf88d38
commit 2fbc77a155

@ -2836,6 +2836,11 @@ class Router(object):
self.broker.defer(self._async_route, msg)
class NullTimerList(object):
def get_timeout(self):
return None
class Broker(object):
"""
Responsible for handling I/O multiplexing in a private thread.
@ -2847,6 +2852,10 @@ class Broker(object):
_waker = None
_thread = None
# :func:`mitogen.parent._upgrade_broker` replaces this with
# :class:`mitogen.parent.TimerList` during upgrade.
timers = NullTimerList()
#: Seconds grace to allow :class:`streams <Stream>` to shutdown gracefully
#: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0
@ -2975,10 +2984,19 @@ class Broker(object):
"""
_vv and IOLOG.debug('%r._loop_once(%r, %r)',
self, timeout, self.poller)
timer_to = self.timers.get_timeout()
if timeout is None:
timeout = timer_to
elif timer_to is not None and timer_to < timeout:
timeout = timer_to
#IOLOG.debug('readers =\n%s', pformat(self.poller.readers))
#IOLOG.debug('writers =\n%s', pformat(self.poller.writers))
for side, func in self.poller.poll(timeout):
self._call(side.stream, func)
if timer_to is not None:
self.timers.expire()
def _broker_exit(self):
"""

@ -1061,6 +1061,7 @@ class Broker(mitogen.core.Broker):
on_join=self.shutdown,
)
super(Broker, self).__init__()
self.timers = mitogen.parent.TimerList()
def shutdown(self):
super(Broker, self).shutdown()

@ -38,6 +38,7 @@ import codecs
import errno
import fcntl
import getpass
import heapq
import inspect
import logging
import os
@ -577,6 +578,52 @@ def write_all(fd, s, deadline=None):
poller.close()
class Timer(object):
"""
Represents an unexpired timed callback.
"""
def __init__(self, timer_list, when, func):
self.timer_list = timer_list
self.when = when
self.func = func
self.cancelled = False
def __eq__(self, other):
return self.when == other.when
def __lt__(self, other):
return self.when < other.when
def cancel(self):
self.cancelled = True
class TimerList(object):
"""
Represent a series of future events.
"""
_now = time.time
def __init__(self):
self._lst = []
def get_timeout(self):
if self._lst:
return max(0, self._lst[0].when - self._now())
def schedule(self, when, func):
timer = Timer(self, when, func)
heapq.heappush(self._lst, timer)
return timer
def expire(self):
now = self._now()
while self._lst and self._lst[0].when <= now:
timer = heapq.heappop(self._lst)
if not timer.cancelled:
timer.func()
class PartialZlib(object):
"""
Because the mitogen.core source has a line appended to it during bootstrap,
@ -726,7 +773,7 @@ def _upgrade_broker(broker):
root = logging.getLogger()
old_level = root.level
root.setLevel(logging.CRITICAL)
try:
old = broker.poller
new = PREFERRED_POLLER()
for fd, data in old.readers:
@ -736,7 +783,10 @@ def _upgrade_broker(broker):
old.close()
broker.poller = new
finally:
root.setLevel(old_level)
broker.timer_list = TimerList()
LOG.debug('replaced %r with %r (new: %d readers, %d writers; '
'old: %d readers, %d writers)', old, new,
len(new.readers), len(new.writers),

@ -1,4 +1,5 @@
import time
import threading
import mock

@ -0,0 +1,174 @@
import time
import mock
import unittest2
import mitogen.core
import mitogen.parent
import testlib
class TimerListMixin(object):
klass = mitogen.parent.TimerList
def setUp(self):
self.list = self.klass()
class GetTimeoutTest(TimerListMixin, testlib.TestCase):
def test_empty(self):
self.assertEquals(None, self.list.get_timeout())
def test_one_event(self):
self.list.schedule(2, lambda: None)
self.list._now = lambda: 1
self.assertEquals(1, self.list.get_timeout())
def test_two_events_same_moment(self):
self.list.schedule(2, lambda: None)
self.list.schedule(2, lambda: None)
self.list._now = lambda: 1
self.assertEquals(1, self.list.get_timeout())
def test_two_events(self):
self.list.schedule(2, lambda: None)
self.list.schedule(3, lambda: None)
self.list._now = lambda: 1
self.assertEquals(1, self.list.get_timeout())
def test_two_events_expired(self):
self.list.schedule(2, lambda: None)
self.list.schedule(3, lambda: None)
self.list._now = lambda: 3
self.assertEquals(0, self.list.get_timeout())
def test_two_events_in_past(self):
self.list.schedule(2, lambda: None)
self.list.schedule(3, lambda: None)
self.list._now = lambda: 30
self.assertEquals(0, self.list.get_timeout())
def test_two_events_in_past(self):
self.list.schedule(2, lambda: None)
self.list.schedule(3, lambda: None)
self.list._now = lambda: 30
self.assertEquals(0, self.list.get_timeout())
class ScheduleTest(TimerListMixin, testlib.TestCase):
def test_in_past(self):
self.list._now = lambda: 30
timer = self.list.schedule(29, lambda: None)
self.assertEquals(29, timer.when)
self.assertEquals(0, self.list.get_timeout())
def test_in_future(self):
self.list._now = lambda: 30
timer = self.list.schedule(31, lambda: None)
self.assertEquals(31, timer.when)
self.assertEquals(1, self.list.get_timeout())
def test_same_moment(self):
self.list._now = lambda: 30
timer = self.list.schedule(31, lambda: None)
timer2 = self.list.schedule(31, lambda: None)
self.assertEquals(31, timer.when)
self.assertEquals(31, timer2.when)
self.assertTrue(timer is not timer2)
self.assertEquals(1, self.list.get_timeout())
class ExpireTest(TimerListMixin, testlib.TestCase):
def test_in_past(self):
timer = self.list.schedule(29, mock.Mock())
self.list._now = lambda: 30
self.list.expire()
self.assertEquals(1, len(timer.func.mock_calls))
def test_in_future(self):
timer = self.list.schedule(29, mock.Mock())
self.list._now = lambda: 28
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
def test_same_moment(self):
timer = self.list.schedule(29, mock.Mock())
timer2 = self.list.schedule(29, mock.Mock())
self.list._now = lambda: 29
self.list.expire()
self.assertEquals(1, len(timer.func.mock_calls))
self.assertEquals(1, len(timer2.func.mock_calls))
def test_cancelled(self):
self.list._now = lambda: 29
timer = self.list.schedule(29, mock.Mock())
timer.cancel()
self.assertEquals(0, self.list.get_timeout())
self.list._now = lambda: 29
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
self.assertEquals(None, self.list.get_timeout())
class CancelTest(TimerListMixin, testlib.TestCase):
def test_single_cancel(self):
self.list._now = lambda: 29
timer = self.list.schedule(29, mock.Mock())
timer.cancel()
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
def test_double_cancel(self):
self.list._now = lambda: 29
timer = self.list.schedule(29, mock.Mock())
timer.cancel()
timer.cancel()
self.list.expire()
self.assertEquals(0, len(timer.func.mock_calls))
@mitogen.core.takes_econtext
def do_timer_test_econtext(econtext):
do_timer_test(econtext.broker)
def do_timer_test(broker):
now = time.time()
latch = mitogen.core.Latch()
broker.defer(lambda:
broker.timers.schedule(
now + 0.250,
lambda: latch.put('hi'),
)
)
assert 'hi' == latch.get()
assert time.time() > (now + 0.250)
class BrokerTimerTest(testlib.TestCase):
klass = mitogen.master.Broker
def test_call_later(self):
broker = self.klass()
try:
do_timer_test(broker)
finally:
broker.shutdown()
broker.join()
def test_child_upgrade(self):
router = mitogen.master.Router()
try:
c = router.local()
c.call(mitogen.parent.upgrade_router)
c.call(do_timer_test_econtext)
finally:
router.broker.shutdown()
router.broker.join()
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save