From 2fbc77a1559e1f9c2b6efcac0289e1a90bf39264 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Mar 2019 00:21:25 +0000 Subject: [PATCH] issue #170: implement timers. --- mitogen/core.py | 18 +++++ mitogen/master.py | 1 + mitogen/parent.py | 70 ++++++++++++++--- tests/broker_test.py | 1 + tests/timer_test.py | 174 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 254 insertions(+), 10 deletions(-) create mode 100644 tests/timer_test.py diff --git a/mitogen/core.py b/mitogen/core.py index ea83f961..36b4c74c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2836,6 +2836,11 @@ class Router(object): self.broker.defer(self._async_route, msg) +class NullTimerList(object): + def get_timeout(self): + return None + + class Broker(object): """ Responsible for handling I/O multiplexing in a private thread. @@ -2847,6 +2852,10 @@ class Broker(object): _waker = None _thread = None + # :func:`mitogen.parent._upgrade_broker` replaces this with + # :class:`mitogen.parent.TimerList` during upgrade. + timers = NullTimerList() + #: Seconds grace to allow :class:`streams ` to shutdown gracefully #: before force-disconnecting them during :meth:`shutdown`. shutdown_timeout = 3.0 @@ -2975,10 +2984,19 @@ class Broker(object): """ _vv and IOLOG.debug('%r._loop_once(%r, %r)', self, timeout, self.poller) + + timer_to = self.timers.get_timeout() + if timeout is None: + timeout = timer_to + elif timer_to is not None and timer_to < timeout: + timeout = timer_to + #IOLOG.debug('readers =\n%s', pformat(self.poller.readers)) #IOLOG.debug('writers =\n%s', pformat(self.poller.writers)) for side, func in self.poller.poll(timeout): self._call(side.stream, func) + if timer_to is not None: + self.timers.expire() def _broker_exit(self): """ diff --git a/mitogen/master.py b/mitogen/master.py index fb4f505b..2db78ba0 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -1061,6 +1061,7 @@ class Broker(mitogen.core.Broker): on_join=self.shutdown, ) super(Broker, self).__init__() + self.timers = mitogen.parent.TimerList() def shutdown(self): super(Broker, self).shutdown() diff --git a/mitogen/parent.py b/mitogen/parent.py index 113fdc2e..2c8eab8a 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -38,6 +38,7 @@ import codecs import errno import fcntl import getpass +import heapq import inspect import logging import os @@ -577,6 +578,52 @@ def write_all(fd, s, deadline=None): poller.close() +class Timer(object): + """ + Represents an unexpired timed callback. + """ + def __init__(self, timer_list, when, func): + self.timer_list = timer_list + self.when = when + self.func = func + self.cancelled = False + + def __eq__(self, other): + return self.when == other.when + + def __lt__(self, other): + return self.when < other.when + + def cancel(self): + self.cancelled = True + + +class TimerList(object): + """ + Represent a series of future events. + """ + _now = time.time + + def __init__(self): + self._lst = [] + + def get_timeout(self): + if self._lst: + return max(0, self._lst[0].when - self._now()) + + def schedule(self, when, func): + timer = Timer(self, when, func) + heapq.heappush(self._lst, timer) + return timer + + def expire(self): + now = self._now() + while self._lst and self._lst[0].when <= now: + timer = heapq.heappop(self._lst) + if not timer.cancelled: + timer.func() + + class PartialZlib(object): """ Because the mitogen.core source has a line appended to it during bootstrap, @@ -726,17 +773,20 @@ def _upgrade_broker(broker): root = logging.getLogger() old_level = root.level root.setLevel(logging.CRITICAL) + try: + old = broker.poller + new = PREFERRED_POLLER() + for fd, data in old.readers: + new.start_receive(fd, data) + for fd, data in old.writers: + new.start_transmit(fd, data) + + old.close() + broker.poller = new + finally: + root.setLevel(old_level) - old = broker.poller - new = PREFERRED_POLLER() - for fd, data in old.readers: - new.start_receive(fd, data) - for fd, data in old.writers: - new.start_transmit(fd, data) - - old.close() - broker.poller = new - root.setLevel(old_level) + broker.timer_list = TimerList() LOG.debug('replaced %r with %r (new: %d readers, %d writers; ' 'old: %d readers, %d writers)', old, new, len(new.readers), len(new.writers), diff --git a/tests/broker_test.py b/tests/broker_test.py index 23839a54..2212d8aa 100644 --- a/tests/broker_test.py +++ b/tests/broker_test.py @@ -1,4 +1,5 @@ +import time import threading import mock diff --git a/tests/timer_test.py b/tests/timer_test.py new file mode 100644 index 00000000..e4637ed5 --- /dev/null +++ b/tests/timer_test.py @@ -0,0 +1,174 @@ + +import time + +import mock +import unittest2 + +import mitogen.core +import mitogen.parent + +import testlib + + +class TimerListMixin(object): + klass = mitogen.parent.TimerList + + def setUp(self): + self.list = self.klass() + + +class GetTimeoutTest(TimerListMixin, testlib.TestCase): + def test_empty(self): + self.assertEquals(None, self.list.get_timeout()) + + def test_one_event(self): + self.list.schedule(2, lambda: None) + self.list._now = lambda: 1 + self.assertEquals(1, self.list.get_timeout()) + + def test_two_events_same_moment(self): + self.list.schedule(2, lambda: None) + self.list.schedule(2, lambda: None) + self.list._now = lambda: 1 + self.assertEquals(1, self.list.get_timeout()) + + def test_two_events(self): + self.list.schedule(2, lambda: None) + self.list.schedule(3, lambda: None) + self.list._now = lambda: 1 + self.assertEquals(1, self.list.get_timeout()) + + def test_two_events_expired(self): + self.list.schedule(2, lambda: None) + self.list.schedule(3, lambda: None) + self.list._now = lambda: 3 + self.assertEquals(0, self.list.get_timeout()) + + def test_two_events_in_past(self): + self.list.schedule(2, lambda: None) + self.list.schedule(3, lambda: None) + self.list._now = lambda: 30 + self.assertEquals(0, self.list.get_timeout()) + + def test_two_events_in_past(self): + self.list.schedule(2, lambda: None) + self.list.schedule(3, lambda: None) + self.list._now = lambda: 30 + self.assertEquals(0, self.list.get_timeout()) + + +class ScheduleTest(TimerListMixin, testlib.TestCase): + def test_in_past(self): + self.list._now = lambda: 30 + timer = self.list.schedule(29, lambda: None) + self.assertEquals(29, timer.when) + self.assertEquals(0, self.list.get_timeout()) + + def test_in_future(self): + self.list._now = lambda: 30 + timer = self.list.schedule(31, lambda: None) + self.assertEquals(31, timer.when) + self.assertEquals(1, self.list.get_timeout()) + + def test_same_moment(self): + self.list._now = lambda: 30 + timer = self.list.schedule(31, lambda: None) + timer2 = self.list.schedule(31, lambda: None) + self.assertEquals(31, timer.when) + self.assertEquals(31, timer2.when) + self.assertTrue(timer is not timer2) + self.assertEquals(1, self.list.get_timeout()) + + +class ExpireTest(TimerListMixin, testlib.TestCase): + def test_in_past(self): + timer = self.list.schedule(29, mock.Mock()) + self.list._now = lambda: 30 + self.list.expire() + self.assertEquals(1, len(timer.func.mock_calls)) + + def test_in_future(self): + timer = self.list.schedule(29, mock.Mock()) + self.list._now = lambda: 28 + self.list.expire() + self.assertEquals(0, len(timer.func.mock_calls)) + + def test_same_moment(self): + timer = self.list.schedule(29, mock.Mock()) + timer2 = self.list.schedule(29, mock.Mock()) + self.list._now = lambda: 29 + self.list.expire() + self.assertEquals(1, len(timer.func.mock_calls)) + self.assertEquals(1, len(timer2.func.mock_calls)) + + def test_cancelled(self): + self.list._now = lambda: 29 + timer = self.list.schedule(29, mock.Mock()) + timer.cancel() + self.assertEquals(0, self.list.get_timeout()) + self.list._now = lambda: 29 + self.list.expire() + self.assertEquals(0, len(timer.func.mock_calls)) + self.assertEquals(None, self.list.get_timeout()) + + +class CancelTest(TimerListMixin, testlib.TestCase): + def test_single_cancel(self): + self.list._now = lambda: 29 + timer = self.list.schedule(29, mock.Mock()) + timer.cancel() + self.list.expire() + self.assertEquals(0, len(timer.func.mock_calls)) + + def test_double_cancel(self): + self.list._now = lambda: 29 + timer = self.list.schedule(29, mock.Mock()) + timer.cancel() + timer.cancel() + self.list.expire() + self.assertEquals(0, len(timer.func.mock_calls)) + + +@mitogen.core.takes_econtext +def do_timer_test_econtext(econtext): + do_timer_test(econtext.broker) + + +def do_timer_test(broker): + now = time.time() + latch = mitogen.core.Latch() + broker.defer(lambda: + broker.timers.schedule( + now + 0.250, + lambda: latch.put('hi'), + ) + ) + + assert 'hi' == latch.get() + assert time.time() > (now + 0.250) + + +class BrokerTimerTest(testlib.TestCase): + klass = mitogen.master.Broker + + def test_call_later(self): + broker = self.klass() + try: + do_timer_test(broker) + finally: + broker.shutdown() + broker.join() + + def test_child_upgrade(self): + router = mitogen.master.Router() + try: + c = router.local() + c.call(mitogen.parent.upgrade_router) + c.call(do_timer_test_econtext) + finally: + router.broker.shutdown() + router.broker.join() + + +if __name__ == '__main__': + unittest2.main()