Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #535: update Changelog.
  issue #535: wire mitogen.os_fork into Broker and Pool.
  issue #535: parent: add create_socketpair(size=..) parameter.
  issue #535: introduce mitogen.os_fork module and Corker class.
  issue #535: docs: update Changelog
  issue #535: service: support Pool.defer() like Broker.defer()
  issue #535: core: unicode.encode() may take importer lock on 2.x
  issue #535: docs: fix up Select doc
  issue #535: docs: update Changelog.
  issue #535: core/select: support selecting from Latches.
pull/564/head
David Wilson 6 years ago
commit 509590530b

@ -579,6 +579,10 @@ Select Class
.. module:: mitogen.select
.. currentmodule:: mitogen.select
.. autoclass:: Event
:members:
.. autoclass:: Select
:members:
@ -605,6 +609,14 @@ Broker Class
:members:
Corker Class
============
.. currentmodule:: mitogen.os_fork
.. autoclass:: Corker
:members:
Utility Functions
=================

@ -148,10 +148,26 @@ Fixes
Core Library
~~~~~~~~~~~~
* `#535 <https://github.com/dw/mitogen/issues/535>`_: to support function calls
on a service pool from another thread, :class:`mitogen.select.Select`
additionally permits waiting on :class:`mitogen.core.Latch`.
* `#535 <https://github.com/dw/mitogen/issues/535>`_:
:class:`mitogen.service.Pool.defer` allows any function to be enqueued for
the thread pool from another thread.
* `#535 <https://github.com/dw/mitogen/issues/535>`_: a new
:mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses
all thread activity during a fork. :class:`mitogen.core.Broker` and
:class:`mitogen.service.Pool` automatically record their existence so that an
:func:`os.fork` monkey-patch activated for Python 2.4 and 2.5 can
automatically pause them for any attempt to start a subprocess.
* `ca63c26e <https://github.com/dw/mitogen/commit/ca63c26e>`_:
:meth:`mitogen.core.Latch.put`'s `obj` argument was made optional.
Thanks!
~~~~~~~

@ -103,6 +103,9 @@ IOLOG = logging.getLogger('mitogen.io')
IOLOG.setLevel(logging.INFO)
LATIN1_CODEC = encodings.latin_1.Codec()
# str.encode() may take import lock. Deadlock possible if broker calls
# .encode() on behalf of thread currently waiting for module.
UTF8_CODEC = encodings.latin_1.Codec()
_v = False
_vv = False
@ -271,9 +274,8 @@ class Kwargs(dict):
def __init__(self, dct):
for k, v in dct.iteritems():
if type(k) is unicode:
self[k.encode()] = v
else:
self[k] = v
k, _ = UTF8_CODEC.encode(k)
self[k] = v
def __repr__(self):
return 'Kwargs(%s)' % (dict.__repr__(self),)
@ -735,7 +737,7 @@ class Message(object):
"""
Syntax helper to construct a dead message.
"""
kwargs['data'] = (reason or u'').encode()
kwargs['data'], _ = UTF8_CODEC.encode(reason or u'')
return cls(reply_to=IS_DEAD, **kwargs)
@classmethod
@ -1092,6 +1094,7 @@ class Importer(object):
'lxd',
'master',
'minify',
'os_fork',
'parent',
'select',
'service',
@ -1332,7 +1335,7 @@ class Importer(object):
if mod.__package__ and not PY3:
# 2.x requires __package__ to be exactly a string.
mod.__package__ = mod.__package__.encode()
mod.__package__, _ = UTF8_CODEC.encode(mod.__package__)
source = self.get_source(fullname)
try:
@ -2051,6 +2054,8 @@ class Latch(object):
"""
poller_class = Poller
notify = None
# The _cls_ prefixes here are to make it crystal clear in the code which
# state mutation isn't covered by :attr:`_lock`.
@ -2264,6 +2269,8 @@ class Latch(object):
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, wsock.fileno())
self._wake(wsock, cookie)
elif self.notify:
self.notify(self)
finally:
self._lock.release()
@ -3341,6 +3348,17 @@ class ExternalContext(object):
# Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1)
def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self.broker)
def main(self):
self._setup_master()
try:
@ -3368,6 +3386,7 @@ class ExternalContext(object):
socket.gethostname())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)
self._py24_25_compat()
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:

@ -0,0 +1,153 @@
# Copyright 2019, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# !mitogen: minify_safe
"""
When operating in a mixed threading/forking environment, it is critical no
threads are active at the moment of fork, as they could be within critical
sections whose mutexes are snapshotted in the locked state in the fork child.
To permit unbridled Mitogen use in a forking program, a mechanism must exist to
temporarily halt any threads in operation -- namely the broker and any pool
threads.
"""
import os
import socket
import sys
import weakref
import mitogen.core
# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
# list and mitogen.service registers its Pool too.
_brokers = weakref.WeakValueDictionary()
_pools = weakref.WeakValueDictionary()
def _notice_broker_or_pool(obj):
if isinstance(obj, mitogen.core.Broker):
_brokers[id(obj)] = obj
else:
_pools[id(obj)] = obj
def wrap_os__fork():
corker = Corker(
brokers=list(_brokers.values()),
pools=list(_pools.values()),
)
try:
corker.cork()
return os__fork()
finally:
corker.uncork()
# If Python 2.4/2.5 where threading state is not fixed up, subprocess.Popen()
# may still deadlock due to the broker thread. In this case, pause os.fork() so
# that all active threads are paused during fork.
if sys.version_info < (2, 6):
os__fork = os.fork
os.fork = wrap_os__fork
class Corker(object):
"""
Arrange for :class:`mitogen.core.Broker` and optionally
:class:`mitogen.service.Pool` to be temporarily "corked" while fork
operations may occur.
Since this necessarily involves posting a message to every existent thread
and verifying acknowledgement, it will never be a fast operation.
"""
def __init__(self, brokers=(), pools=()):
self.brokers = brokers
self.pools = pools
def _do_cork(self, s, wsock):
try:
try:
while True:
# at least EINTR is possible. Do our best to keep handling
# outside the GIL in this case using sendall().
wsock.sendall(s)
except socket.error:
pass
finally:
wsock.close()
def _cork_one(self, s, obj):
"""
To ensure the target thread has all locks dropped, we ask it to write a
large string to a socket with a small buffer that has O_NONBLOCK
disabled. CPython will drop the GIL and enter the write() system call,
where it will block until the socket buffer is drained, or the write
side is closed. We can detect the thread has blocked outside of Python
code by checking if the socket buffer has started to fill using a
poller.
"""
rsock, wsock = mitogen.parent.create_socketpair(size=4096)
mitogen.core.set_cloexec(rsock.fileno())
mitogen.core.set_cloexec(wsock.fileno())
mitogen.core.set_block(wsock) # gevent
self._rsocks.append(rsock)
obj.defer(self._do_cork, s, wsock)
poller = mitogen.core.Poller()
poller.start_receive(rsock.fileno())
try:
while True:
for fd in poller.poll():
return
finally:
poller.close()
def cork(self):
"""
Arrange for the broker and optional pool to be paused with no locks
held. This will not return until each thread acknowledges it has ceased
execution.
"""
s = 'CORK' * ((128 / 4) * 1024)
self._rsocks = []
for pool in self.pools:
if not pool.closed:
for x in range(pool.size):
self._cork_one(s, pool)
for broker in self.brokers:
if broker._alive:
self._cork_one(s, broker)
def uncork(self):
"""
Arrange for paused threads to resume operation.
"""
for rsock in self._rsocks:
rsock.close()

@ -253,7 +253,7 @@ def close_nonstandard_fds():
pass
def create_socketpair():
def create_socketpair(size=None):
"""
Create a :func:`socket.socketpair` to use for use as a child process's UNIX
stdio channels. As socket pairs are bidirectional, they are economical on
@ -265,10 +265,10 @@ def create_socketpair():
parentfp, childfp = socket.socketpair()
parentfp.setsockopt(socket.SOL_SOCKET,
socket.SO_SNDBUF,
mitogen.core.CHUNK_SIZE)
size or mitogen.core.CHUNK_SIZE)
childfp.setsockopt(socket.SOL_SOCKET,
socket.SO_RCVBUF,
mitogen.core.CHUNK_SIZE)
size or mitogen.core.CHUNK_SIZE)
return parentfp, childfp

@ -35,12 +35,25 @@ class Error(mitogen.core.Error):
pass
class Event(object):
"""
Represents one selected event.
"""
#: The first Receiver or Latch the event traversed.
source = None
#: The :class:`mitogen.core.Message` delivered to a receiver, or the object
#: posted to a latch.
data = None
class Select(object):
"""
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances
and returns the first value posted to any receiver or select.
:class:`receivers <mitogen.core.Receiver>`,
:class:`channels <mitogen.core.Channel>`,
:class:`latches <mitogen.core.Latch>`, and
:class:`sub-selects <Select>`.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
@ -84,6 +97,19 @@ class Select(object):
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
:class:`Select` may be used to mix inter-thread and inter-process IO:
latch = mitogen.core.Latch()
start_thread(latch)
recv = remote_host.call_async(os.getuid)
sel = Select([latch, recv])
event = sel.get_event()
if event.source is latch:
# woken by a local thread
else:
# woken by function call result
"""
notify = None
@ -145,14 +171,29 @@ class Select(object):
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
def iter_data(self):
"""
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
Yield :attr:`Event.data` until no receivers remain in the select,
either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
:meth:`__iter__` is an alias for :meth:`iter_data`, allowing loops
like::
for msg in Select([recv1, recv2]):
print msg.unpickle()
"""
while self._receivers:
yield self.get()
yield self.get_event().data
__iter__ = iter_data
def iter_events(self):
"""
Yield :class:`Event` instances until no receivers remain in the select.
"""
while self._receivers:
yield self.get_event()
loop_msg = 'Adding this Select instance would create a Select cycle'
@ -170,8 +211,8 @@ class Select(object):
def add(self, recv):
"""
Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the
select.
Add a :class:`mitogen.core.Receiver`, :class:`Select` or
:class:`mitogen.core.Latch` to the select.
:raises mitogen.select.Error:
An attempt was made to add a :class:`Select` to which this select
@ -193,10 +234,9 @@ class Select(object):
def remove(self, recv):
"""
Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv`
from the select. Note that if the receiver has notified prior to
:meth:`remove`, it will still be returned by a subsequent :meth:`get`.
This may change in a future version.
Remove an object from from the select. Note that if the receiver has
notified prior to :meth:`remove`, it will still be returned by a
subsequent :meth:`get`. This may change in a future version.
"""
try:
if recv.notify != self._put:
@ -240,7 +280,14 @@ class Select(object):
def get(self, timeout=None, block=True):
"""
Fetch the next available value from any receiver, or raise
Call `get_event(timeout, block)` returning :attr:`Event.data` of the
first available event.
"""
return self.get_event(timeout, block).data
def get_event(self, timeout=None, block=True):
"""
Fetch the next available :class:`Event` from any source, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
@ -253,7 +300,7 @@ class Select(object):
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:class:`Event`.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
@ -263,14 +310,21 @@ class Select(object):
if not self._receivers:
raise Error(self.empty_msg)
event = Event()
while True:
recv = self._latch.get(timeout=timeout, block=block)
try:
msg = recv.get(block=False)
if isinstance(recv, Select):
event = recv.get_event(block=False)
else:
event.source = recv
event.data = recv.get(block=False)
if self._oneshot:
self.remove(recv)
msg.receiver = recv
return msg
if isinstance(recv, mitogen.core.Receiver):
# Remove in 0.3.x.
event.data.receiver = recv
return event
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another

@ -411,10 +411,12 @@ class Service(object):
def __repr__(self):
return '%s()' % (self.__class__.__name__,)
def on_message(self, recv, msg):
def on_message(self, event):
"""
Called when a message arrives on any of :attr:`select`'s registered
receivers.
:param mitogen.select.Event event:
"""
pass
@ -452,6 +454,7 @@ class Pool(object):
def __init__(self, router, services, size=1, overwrite=False):
self.router = router
self._activator = self.activator_class()
self._ipc_latch = mitogen.core.Latch()
self._receiver = mitogen.core.Receiver(
router=router,
handle=mitogen.core.CALL_SERVICE,
@ -460,13 +463,18 @@ class Pool(object):
self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver)
self._select.add(self._ipc_latch)
#: Serialize service construction.
self._lock = threading.Lock()
self._func_by_recv = {self._receiver: self._on_service_call}
self._func_by_source = {
self._receiver: self._on_service_call,
self._ipc_latch: self._on_ipc_latch,
}
self._invoker_by_name = {}
for service in services:
self.add(service)
self._py_24_25_compat()
self._threads = []
for x in range(size):
name = 'mitogen.service.Pool.%x.worker-%d' % (id(self), x,)
@ -480,6 +488,13 @@ class Pool(object):
LOG.debug('%r: initialized', self)
def _py_24_25_compat(self):
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner sending mitogen.fork
# to all mitogen.service importers.
os_fork = mitogen.core.import_module('mitogen.os_fork')
os_fork._notice_broker_or_pool(self)
@property
def size(self):
return len(self._threads)
@ -488,10 +503,10 @@ class Pool(object):
name = service.name()
if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv
assert service.select not in self._func_by_source
invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message
self._func_by_source[service.select] = service.on_message
closed = False
@ -534,7 +549,18 @@ class Pool(object):
isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg):
def defer(self, func, *args, **kwargs):
"""
Arrange for `func(*args, **kwargs)` to be invoked in the context of a
service pool thread.
"""
self._ipc_latch.put(lambda: func(*args, **kwargs))
def _on_ipc_latch(self, event):
event.data()
def _on_service_call(self, event):
msg = event.data
service_name = None
method_name = None
try:
@ -555,17 +581,17 @@ class Pool(object):
def _worker_run(self):
while not self.closed:
try:
msg = self._select.get()
event = self._select.get_event()
except (mitogen.core.ChannelError, mitogen.core.LatchError):
e = sys.exc_info()[1]
LOG.debug('%r: channel or latch closed, exitting: %s', self, e)
return
func = self._func_by_recv[msg.receiver]
func = self._func_by_source[event.source]
try:
func(msg.receiver, msg)
func(event)
except Exception:
LOG.exception('While handling %r using %r', msg, func)
LOG.exception('While handling %r using %r', event.data, func)
def _worker_main(self):
try:

@ -74,8 +74,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
context = self.router.local()
self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
self.assertEquals(1, self.router.responder.get_module_count)
self.assertEquals(1, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(1+os_fork, self.router.responder.get_module_count)
self.assertEquals(1+os_fork, self.router.responder.good_load_module_count)
self.assertLess(300, self.router.responder.good_load_module_size)
def test_simple_pkg(self):
@ -84,8 +85,9 @@ class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
context = self.router.local()
self.assertEquals(3,
context.call(simple_pkg.a.subtract_one_add_two, 2))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(3, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(3+os_fork, self.router.responder.good_load_module_count)
self.assertEquals(0, self.router.responder.bad_load_module_count)
self.assertLess(450, self.router.responder.good_load_module_size)
@ -185,9 +187,10 @@ class ForwardTest(testlib.RouterMixin, testlib.TestCase):
c1 = self.router.local()
c2 = self.router.local(via=c1)
os_fork = int(sys.version_info < (2, 6))
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(2, self.router.responder.good_load_module_count)
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(2+os_fork, self.router.responder.good_load_module_count)
self.assertLess(10000, self.router.responder.good_load_module_size)
self.assertGreater(40000, self.router.responder.good_load_module_size)

@ -9,6 +9,18 @@ import testlib
class BoolTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch() # oneshot
select = self.klass()
self.assertFalse(select)
select.add(latch)
self.assertTrue(select)
latch.put(123)
self.assertTrue(select)
self.assertEquals(123, select.get())
self.assertFalse(select)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router) # oneshot
select = self.klass()
@ -25,6 +37,14 @@ class BoolTest(testlib.RouterMixin, testlib.TestCase):
class AddTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_latch(self):
latch = mitogen.core.Latch()
select = self.klass()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass()
@ -98,14 +118,14 @@ class AddTest(testlib.RouterMixin, testlib.TestCase):
class RemoveTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
def test_receiver_empty(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_absent(self):
def test_receiver_absent(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
recv2 = mitogen.core.Receiver(self.router)
@ -114,7 +134,7 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.remove(recv))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_present(self):
def test_receiver_present(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
select.add(recv)
@ -122,6 +142,30 @@ class RemoveTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_latch_empty(self):
select = self.klass()
latch = mitogen.core.Latch()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_absent(self):
select = self.klass()
latch = mitogen.core.Latch()
latch2 = mitogen.core.Latch()
select.add(latch2)
exc = self.assertRaises(mitogen.select.Error,
lambda: select.remove(latch))
self.assertEquals(str(exc), self.klass.not_present_msg)
def test_latch_present(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
select.remove(latch)
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class CloseTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -130,6 +174,18 @@ class CloseTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
select.close() # No effect.
def test_one_latch(self):
select = self.klass()
latch = mitogen.core.Latch()
select.add(latch)
self.assertEquals(1, len(select._receivers))
self.assertEquals(select._put, latch.notify)
select.close()
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
def test_one_receiver(self):
select = self.klass()
recv = mitogen.core.Receiver(self.router)
@ -174,18 +230,35 @@ class EmptyTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass([recv])
self.assertTrue(select.empty())
def test_nonempty_before_add(self):
def test_nonempty_receiver_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
self.assertFalse(select.empty())
def test_nonempty_after_add(self):
def test_nonempty__receiver_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
recv._on_receive(mitogen.core.Message.pickled('123'))
self.assertFalse(select.empty())
def test_empty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertTrue(select.empty())
def test_nonempty_latch_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertFalse(select.empty())
def test_nonempty__latch_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertFalse(select.empty())
class IterTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
@ -194,18 +267,24 @@ class IterTest(testlib.RouterMixin, testlib.TestCase):
select = self.klass()
self.assertEquals([], list(select))
def test_nonempty(self):
def test_nonempty_receiver(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
recv._on_receive(msg)
self.assertEquals([msg], list(select))
def test_nonempty_latch(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals([123], list(select))
class OneShotTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_true_removed_after_get(self):
def test_true_receiver_removed_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
msg = mitogen.core.Message.pickled('123')
@ -215,7 +294,7 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, recv.notify)
def test_false_persists_after_get(self):
def test_false_receiver_persists_after_get(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv], oneshot=False)
msg = mitogen.core.Message.pickled('123')
@ -226,8 +305,26 @@ class OneShotTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(recv, select._receivers[0])
self.assertEquals(select._put, recv.notify)
def test_true_latch_removed_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(0, len(select._receivers))
self.assertEquals(None, latch.notify)
class GetTest(testlib.RouterMixin, testlib.TestCase):
def test_false_latch_persists_after_get(self):
latch = mitogen.core.Latch()
select = self.klass([latch], oneshot=False)
latch.put(123)
self.assertEquals(123, select.get())
self.assertEquals(1, len(select._receivers))
self.assertEquals(latch, select._receivers[0])
self.assertEquals(select._put, latch.notify)
class GetReceiverTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_receivers(self):
@ -285,5 +382,79 @@ class GetTest(testlib.RouterMixin, testlib.TestCase):
lambda: select.get(timeout=0.0))
class GetLatchTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_no_latches(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_timeout_no_receivers(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get(timeout=1.0))
self.assertEquals(str(exc), self.klass.empty_msg)
def test_zero_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
def test_timeout(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.1))
def test_nonempty_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, select.get())
def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])
latch.put(123)
self.assertEquals(123, latch.get())
def test_drained_by_other_thread(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
self.assertEquals(123, latch.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(timeout=0.0))
class GetEventTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.select.Select
def test_empty(self):
select = self.klass()
exc = self.assertRaises(mitogen.select.Error,
lambda: select.get())
self.assertEquals(str(exc), self.klass.empty_msg)
def test_latch(self):
latch = mitogen.core.Latch()
latch.put(123)
select = self.klass([latch])
event = select.get_event()
self.assertEquals(latch, event.source)
self.assertEquals(123, event.data)
def test_receiver(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
select = self.klass([recv])
event = select.get_event()
self.assertEquals(recv, event.source)
self.assertEquals('123', event.data.unpickle())
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save