From ddf28987a079587366fe2b11ebea7a2971032cce Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 24 May 2018 23:32:07 +0100 Subject: [PATCH] master: split Select() into new module to reduce wire size. service.py currently imports master.py(+parent.py) just to get Select(). --- ansible_mitogen/mixins.py | 6 +- mitogen/core.py | 5 +- mitogen/master.py | 104 ----------------------------- mitogen/parent.py | 4 +- mitogen/select.py | 133 ++++++++++++++++++++++++++++++++++++++ mitogen/service.py | 4 +- tests/select_test.py | 32 ++++----- 7 files changed, 161 insertions(+), 127 deletions(-) create mode 100644 mitogen/select.py diff --git a/ansible_mitogen/mixins.py b/ansible_mitogen/mixins.py index b9cbd3e7..fdd104b2 100644 --- a/ansible_mitogen/mixins.py +++ b/ansible_mitogen/mixins.py @@ -49,7 +49,7 @@ except ImportError: # Ansible<2.4 from ansible.plugins import module_loader import mitogen.core -import mitogen.master +import mitogen.select import mitogen.utils import ansible_mitogen.connection @@ -253,7 +253,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): """ LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)', paths, mode, sudoable) - return self.fake_shell(lambda: mitogen.master.Select.all( + return self.fake_shell(lambda: mitogen.select.Select.all( self._connection.call_async( ansible_mitogen.target.set_file_mode, path, mode ) @@ -268,7 +268,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase): LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)', paths, user, sudoable) ent = self.call(pwd.getpwnam, user) - return self.fake_shell(lambda: mitogen.master.Select.all( + return self.fake_shell(lambda: mitogen.select.Select.all( self._connection.call_async( os.chown, path, ent.pw_uid, ent.pw_gid ) diff --git a/mitogen/core.py b/mitogen/core.py index 80fb589a..6e8856b8 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -33,7 +33,6 @@ import imp import itertools import logging import os -import select import signal import socket import struct @@ -45,6 +44,9 @@ import warnings import weakref import zlib +# Absolute imports for <2.5. +select = __import__('select') + try: import cPickle except ImportError: @@ -531,6 +533,7 @@ class Importer(object): 'lxc', 'master', 'parent', + 'select', 'service', 'setns', 'ssh', diff --git a/mitogen/master.py b/mitogen/master.py index 0805206b..e604fc66 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -181,110 +181,6 @@ class ThreadWatcher(object): return watcher -class SelectError(mitogen.core.Error): - pass - - -class Select(object): - notify = None - - @classmethod - def all(cls, receivers): - return list(msg.unpickle() for msg in cls(receivers)) - - def __init__(self, receivers=(), oneshot=True): - self._receivers = [] - self._oneshot = oneshot - self._latch = mitogen.core.Latch() - for recv in receivers: - self.add(recv) - - def _put(self, value): - self._latch.put(value) - if self.notify: - self.notify(self) - - def __bool__(self): - return bool(self._receivers) - - def __enter__(self): - return self - - def __exit__(self, e_type, e_val, e_tb): - self.close() - - def __iter__(self): - while self._receivers: - yield self.get() - - loop_msg = 'Adding this Select instance would create a Select cycle' - - def _check_no_loop(self, recv): - if recv is self: - raise SelectError(self.loop_msg) - - for recv_ in self._receivers: - if recv_ == recv: - raise SelectError(self.loop_msg) - if isinstance(recv_, Select): - recv_._check_no_loop(recv) - - owned_msg = 'Cannot add: Receiver is already owned by another Select' - - def add(self, recv): - if isinstance(recv, Select): - recv._check_no_loop(self) - - self._receivers.append(recv) - if recv.notify is not None: - raise SelectError(self.owned_msg) - - recv.notify = self._put - # Avoid race by polling once after installation. - if not recv.empty(): - self._put(recv) - - not_present_msg = 'Instance is not a member of this Select' - - def remove(self, recv): - try: - if recv.notify != self._put: - raise ValueError - self._receivers.remove(recv) - recv.notify = None - except (IndexError, ValueError): - raise SelectError(self.not_present_msg) - - def close(self): - for recv in self._receivers[:]: - self.remove(recv) - self._latch.close() - - def empty(self): - return self._latch.empty() - - empty_msg = 'Cannot get(), Select instance is empty' - - def get(self, timeout=None): - if not self._receivers: - raise SelectError(self.empty_msg) - - while True: - recv = self._latch.get(timeout=timeout) - try: - msg = recv.get(block=False) - if self._oneshot: - self.remove(recv) - msg.receiver = recv - return msg - except mitogen.core.TimeoutError: - # A receiver may have been queued with no result if another - # thread drained it before we woke up, or because another - # thread drained it between add() calling recv.empty() and - # self._put(). In this case just sleep again. - continue - - class LogForwarder(object): def __init__(self, router): self._router = router diff --git a/mitogen/parent.py b/mitogen/parent.py index 9d66c736..76a91b24 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -32,7 +32,6 @@ import getpass import inspect import logging import os -import select import signal import socket import subprocess @@ -44,6 +43,9 @@ import time import types import zlib +# Absolute imports for <2.5. +select = __import__('select') + try: from cStringIO import StringIO as BytesIO except ImportError: diff --git a/mitogen/select.py b/mitogen/select.py new file mode 100644 index 00000000..d5c1b907 --- /dev/null +++ b/mitogen/select.py @@ -0,0 +1,133 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import mitogen.core + + +class Error(mitogen.core.Error): + pass + + +class Select(object): + notify = None + + @classmethod + def all(cls, receivers): + return list(msg.unpickle() for msg in cls(receivers)) + + def __init__(self, receivers=(), oneshot=True): + self._receivers = [] + self._oneshot = oneshot + self._latch = mitogen.core.Latch() + for recv in receivers: + self.add(recv) + + def _put(self, value): + self._latch.put(value) + if self.notify: + self.notify(self) + + def __bool__(self): + return bool(self._receivers) + + def __enter__(self): + return self + + def __exit__(self, e_type, e_val, e_tb): + self.close() + + def __iter__(self): + while self._receivers: + yield self.get() + + loop_msg = 'Adding this Select instance would create a Select cycle' + + def _check_no_loop(self, recv): + if recv is self: + raise Error(self.loop_msg) + + for recv_ in self._receivers: + if recv_ == recv: + raise Error(self.loop_msg) + if isinstance(recv_, Select): + recv_._check_no_loop(recv) + + owned_msg = 'Cannot add: Receiver is already owned by another Select' + + def add(self, recv): + if isinstance(recv, Select): + recv._check_no_loop(self) + + self._receivers.append(recv) + if recv.notify is not None: + raise Error(self.owned_msg) + + recv.notify = self._put + # Avoid race by polling once after installation. + if not recv.empty(): + self._put(recv) + + not_present_msg = 'Instance is not a member of this Select' + + def remove(self, recv): + try: + if recv.notify != self._put: + raise ValueError + self._receivers.remove(recv) + recv.notify = None + except (IndexError, ValueError): + raise Error(self.not_present_msg) + + def close(self): + for recv in self._receivers[:]: + self.remove(recv) + self._latch.close() + + def empty(self): + return self._latch.empty() + + empty_msg = 'Cannot get(), Select instance is empty' + + def get(self, timeout=None): + if not self._receivers: + raise Error(self.empty_msg) + + while True: + recv = self._latch.get(timeout=timeout) + try: + msg = recv.get(block=False) + if self._oneshot: + self.remove(recv) + msg.receiver = recv + return msg + except mitogen.core.TimeoutError: + # A receiver may have been queued with no result if another + # thread drained it before we woke up, or because another + # thread drained it between add() calling recv.empty() and + # self._put(). In this case just sleep again. + continue diff --git a/mitogen/service.py b/mitogen/service.py index 0d4bd304..01a6454c 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -31,7 +31,7 @@ import sys import threading import mitogen.core -import mitogen.master +import mitogen.select from mitogen.core import LOG @@ -314,7 +314,7 @@ class Pool(object): self.router = router self.services = list(services) self.size = size - self._select = mitogen.master.Select( + self._select = mitogen.select.Select( receivers=[ service.recv for service in self.services diff --git a/tests/select_test.py b/tests/select_test.py index b057d322..d9345954 100644 --- a/tests/select_test.py +++ b/tests/select_test.py @@ -1,13 +1,13 @@ import unittest2 -import mitogen.master +import mitogen.select import testlib class AddTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_receiver(self): recv = mitogen.core.Receiver(self.router) @@ -47,7 +47,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase): def test_subselect_loop_direct(self): select = self.klass() - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.add(select)) self.assertEquals(str(exc), self.klass.loop_msg) @@ -58,7 +58,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase): s0.add(s1) s1.add(s2) - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: s2.add(s0)) self.assertEquals(str(exc), self.klass.loop_msg) @@ -66,7 +66,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase): select = self.klass() recv = mitogen.core.Receiver(self.router) select.add(recv) - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.add(recv)) self.assertEquals(str(exc), self.klass.owned_msg) @@ -74,18 +74,18 @@ class AddTest(testlib.RouterMixin, testlib.TestCase): select = self.klass() select2 = self.klass() select.add(select2) - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.add(select2)) self.assertEquals(str(exc), self.klass.owned_msg) class RemoveTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_empty(self): select = self.klass() recv = mitogen.core.Receiver(self.router) - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.remove(recv)) self.assertEquals(str(exc), self.klass.not_present_msg) @@ -94,7 +94,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase): recv = mitogen.core.Receiver(self.router) recv2 = mitogen.core.Receiver(self.router) select.add(recv2) - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.remove(recv)) self.assertEquals(str(exc), self.klass.not_present_msg) @@ -108,7 +108,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase): class CloseTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_empty(self): select = self.klass() @@ -147,7 +147,7 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase): class EmptyTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_no_receivers(self): select = self.klass() @@ -172,7 +172,7 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase): class IterTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_empty(self): select = self.klass() @@ -187,7 +187,7 @@ class IterTest(testlib.RouterMixin, testlib.TestCase): class OneShotTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_true_removed_after_get(self): recv = mitogen.core.Receiver(self.router) @@ -212,17 +212,17 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase): class GetTest(testlib.RouterMixin, testlib.TestCase): - klass = mitogen.master.Select + klass = mitogen.select.Select def test_no_receivers(self): select = self.klass() - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.get()) self.assertEquals(str(exc), self.klass.empty_msg) def test_timeout_no_receivers(self): select = self.klass() - exc = self.assertRaises(mitogen.master.SelectError, + exc = self.assertRaises(mitogen.select.Error, lambda: select.get(timeout=1.0)) self.assertEquals(str(exc), self.klass.empty_msg)