master: split Select() into new module to reduce wire size.

service.py currently imports master.py(+parent.py) just to get Select().
pull/255/head
David Wilson 6 years ago
parent 7a592d1c34
commit ddf28987a0

@ -49,7 +49,7 @@ except ImportError: # Ansible<2.4
from ansible.plugins import module_loader
import mitogen.core
import mitogen.master
import mitogen.select
import mitogen.utils
import ansible_mitogen.connection
@ -253,7 +253,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
LOG.debug('_remote_chmod(%r, mode=%r, sudoable=%r)',
paths, mode, sudoable)
return self.fake_shell(lambda: mitogen.master.Select.all(
return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async(
ansible_mitogen.target.set_file_mode, path, mode
)
@ -268,7 +268,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
LOG.debug('_remote_chown(%r, user=%r, sudoable=%r)',
paths, user, sudoable)
ent = self.call(pwd.getpwnam, user)
return self.fake_shell(lambda: mitogen.master.Select.all(
return self.fake_shell(lambda: mitogen.select.Select.all(
self._connection.call_async(
os.chown, path, ent.pw_uid, ent.pw_gid
)

@ -33,7 +33,6 @@ import imp
import itertools
import logging
import os
import select
import signal
import socket
import struct
@ -45,6 +44,9 @@ import warnings
import weakref
import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
import cPickle
except ImportError:
@ -531,6 +533,7 @@ class Importer(object):
'lxc',
'master',
'parent',
'select',
'service',
'setns',
'ssh',

@ -181,110 +181,6 @@ class ThreadWatcher(object):
return watcher
class SelectError(mitogen.core.Error):
pass
class Select(object):
notify = None
@classmethod
def all(cls, receivers):
return list(msg.unpickle() for msg in cls(receivers))
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._latch = mitogen.core.Latch()
for recv in receivers:
self.add(recv)
def _put(self, value):
self._latch.put(value)
if self.notify:
self.notify(self)
def __bool__(self):
return bool(self._receivers)
def __enter__(self):
return self
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
while self._receivers:
yield self.get()
loop_msg = 'Adding this Select instance would create a Select cycle'
def _check_no_loop(self, recv):
if recv is self:
raise SelectError(self.loop_msg)
for recv_ in self._receivers:
if recv_ == recv:
raise SelectError(self.loop_msg)
if isinstance(recv_, Select):
recv_._check_no_loop(recv)
owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv):
if isinstance(recv, Select):
recv._check_no_loop(self)
self._receivers.append(recv)
if recv.notify is not None:
raise SelectError(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
try:
if recv.notify != self._put:
raise ValueError
self._receivers.remove(recv)
recv.notify = None
except (IndexError, ValueError):
raise SelectError(self.not_present_msg)
def close(self):
for recv in self._receivers[:]:
self.remove(recv)
self._latch.close()
def empty(self):
return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None):
if not self._receivers:
raise SelectError(self.empty_msg)
while True:
recv = self._latch.get(timeout=timeout)
try:
msg = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue
class LogForwarder(object):
def __init__(self, router):
self._router = router

@ -32,7 +32,6 @@ import getpass
import inspect
import logging
import os
import select
import signal
import socket
import subprocess
@ -44,6 +43,9 @@ import time
import types
import zlib
# Absolute imports for <2.5.
select = __import__('select')
try:
from cStringIO import StringIO as BytesIO
except ImportError:

@ -0,0 +1,133 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import mitogen.core
class Error(mitogen.core.Error):
pass
class Select(object):
notify = None
@classmethod
def all(cls, receivers):
return list(msg.unpickle() for msg in cls(receivers))
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._latch = mitogen.core.Latch()
for recv in receivers:
self.add(recv)
def _put(self, value):
self._latch.put(value)
if self.notify:
self.notify(self)
def __bool__(self):
return bool(self._receivers)
def __enter__(self):
return self
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
while self._receivers:
yield self.get()
loop_msg = 'Adding this Select instance would create a Select cycle'
def _check_no_loop(self, recv):
if recv is self:
raise Error(self.loop_msg)
for recv_ in self._receivers:
if recv_ == recv:
raise Error(self.loop_msg)
if isinstance(recv_, Select):
recv_._check_no_loop(recv)
owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv):
if isinstance(recv, Select):
recv._check_no_loop(self)
self._receivers.append(recv)
if recv.notify is not None:
raise Error(self.owned_msg)
recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
try:
if recv.notify != self._put:
raise ValueError
self._receivers.remove(recv)
recv.notify = None
except (IndexError, ValueError):
raise Error(self.not_present_msg)
def close(self):
for recv in self._receivers[:]:
self.remove(recv)
self._latch.close()
def empty(self):
return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None):
if not self._receivers:
raise Error(self.empty_msg)
while True:
recv = self._latch.get(timeout=timeout)
try:
msg = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue

@ -31,7 +31,7 @@ import sys
import threading
import mitogen.core
import mitogen.master
import mitogen.select
from mitogen.core import LOG
@ -314,7 +314,7 @@ class Pool(object):
self.router = router
self.services = list(services)
self.size = size
self._select = mitogen.master.Select(
self._select = mitogen.select.Select(
receivers=[
service.recv
for service in self.services

@ -1,13 +1,13 @@
import unittest2
import mitogen.master
import mitogen.select
import testlib
class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
@ -47,7 +47,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
def test_subselect_loop_direct(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(select))
self.assertEquals(str(exc), self.klass.loop_msg)
@ -58,7 +58,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
s0.add(s1)
s1.add(s2)
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: s2.add(s0))
self.assertEquals(str(exc), self.klass.loop_msg)
@ -66,7 +66,7 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(recv))
self.assertEquals(str(exc), self.klass.owned_msg)
@ -74,18 +74,18 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
select2 = self.klass()
select.add(select2)
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.add(select2))
self.assertEquals(str(exc), self.klass.owned_msg)
class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
@ -94,7 +94,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router)
select.add(recv2)
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
@ -108,7 +108,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
@ -147,7 +147,7 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase):
class EmptyTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_no_receivers(self):
select = self.klass()
@ -172,7 +172,7 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase):
class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
@ -187,7 +187,7 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_true_removed_after_get(self):
recv = mitogen.core.Receiver(self.router)
@ -212,17 +212,17 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
class GetTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.Select
klass = mitogen.select.Select
def test_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.master.SelectError,
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg)

Loading…
Cancel
Save