|
|
|
import sys
|
|
|
|
import threading
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
import mitogen.core
|
|
|
|
import testlib
|
|
|
|
|
|
|
|
|
|
|
|
def yield_stuff_then_die(sender):
|
|
|
|
for x in range(5):
|
|
|
|
sender.send(x)
|
|
|
|
sender.close()
|
|
|
|
return 10
|
|
|
|
|
|
|
|
|
|
|
|
class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
|
|
|
|
def test_handle(self):
|
|
|
|
recv = mitogen.core.Receiver(self.router)
|
|
|
|
self.assertIsInstance(recv.handle, int)
|
|
|
|
self.assertGreater(recv.handle, 100)
|
|
|
|
self.router.route(
|
|
|
|
mitogen.core.Message.pickled(
|
|
|
|
'hi',
|
|
|
|
dst_id=0,
|
|
|
|
handle=recv.handle,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.assertEqual('hi', recv.get().unpickle())
|
|
|
|
|
|
|
|
|
|
|
|
class IterationTest(testlib.RouterMixin, testlib.TestCase):
|
|
|
|
def test_dead_stops_iteration(self):
|
|
|
|
recv = mitogen.core.Receiver(self.router)
|
|
|
|
fork = self.router.local()
|
|
|
|
ret = fork.call_async(yield_stuff_then_die, recv.to_sender())
|
|
|
|
self.assertEqual(list(range(5)), list(m.unpickle() for m in recv))
|
|
|
|
self.assertEqual(10, ret.get().unpickle())
|
|
|
|
|
|
|
|
def iter_and_put(self, recv, latch):
|
|
|
|
try:
|
|
|
|
for msg in recv:
|
|
|
|
latch.put(msg)
|
|
|
|
except Exception:
|
|
|
|
latch.put(sys.exc_info()[1])
|
|
|
|
|
|
|
|
def test_close_stops_iteration(self):
|
|
|
|
recv = mitogen.core.Receiver(self.router)
|
|
|
|
latch = mitogen.core.Latch()
|
|
|
|
t = threading.Thread(
|
|
|
|
target=self.iter_and_put,
|
|
|
|
args=(recv, latch),
|
|
|
|
)
|
|
|
|
t.start()
|
|
|
|
t.join(0.1)
|
|
|
|
recv.close()
|
|
|
|
t.join()
|
|
|
|
self.assertTrue(latch.empty())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CloseTest(testlib.RouterMixin, testlib.TestCase):
|
|
|
|
def wait(self, latch, wait_recv):
|
|
|
|
try:
|
|
|
|
latch.put(wait_recv.get())
|
|
|
|
except Exception:
|
|
|
|
latch.put(sys.exc_info()[1])
|
|
|
|
|
|
|
|
def test_closes_one(self):
|
|
|
|
latch = mitogen.core.Latch()
|
|
|
|
wait_recv = mitogen.core.Receiver(self.router)
|
|
|
|
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
|
|
|
|
t.start()
|
|
|
|
wait_recv.close()
|
|
|
|
def throw():
|
|
|
|
raise latch.get()
|
|
|
|
t.join()
|
|
|
|
e = self.assertRaises(mitogen.core.ChannelError, throw)
|
|
|
|
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
|
|
|
|
|
|
|
|
def test_closes_all(self):
|
|
|
|
latch = mitogen.core.Latch()
|
|
|
|
wait_recv = mitogen.core.Receiver(self.router)
|
|
|
|
ts = [
|
|
|
|
threading.Thread(target=lambda: self.wait(latch, wait_recv))
|
|
|
|
for x in range(5)
|
|
|
|
]
|
|
|
|
for t in ts:
|
|
|
|
t.start()
|
|
|
|
wait_recv.close()
|
|
|
|
def throw():
|
|
|
|
raise latch.get()
|
|
|
|
for x in range(5):
|
|
|
|
e = self.assertRaises(mitogen.core.ChannelError, throw)
|
|
|
|
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
|
|
|
|
for t in ts:
|
|
|
|
t.join()
|
|
|
|
|
|
|
|
|
|
|
|
class OnReceiveTest(testlib.RouterMixin, testlib.TestCase):
|
|
|
|
# Verify behaviour of _on_receive dead message handling. A dead message
|
|
|
|
# should unregister the receiver and wake all threads.
|
|
|
|
|
|
|
|
def wait(self, latch, wait_recv):
|
|
|
|
try:
|
|
|
|
latch.put(wait_recv.get())
|
|
|
|
except Exception:
|
|
|
|
latch.put(sys.exc_info()[1])
|
|
|
|
|
|
|
|
def test_sender_closes_one_thread(self):
|
|
|
|
latch = mitogen.core.Latch()
|
|
|
|
wait_recv = mitogen.core.Receiver(self.router)
|
|
|
|
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
|
|
|
|
t.start()
|
|
|
|
sender = wait_recv.to_sender()
|
|
|
|
sender.close()
|
|
|
|
def throw():
|
|
|
|
raise latch.get()
|
|
|
|
t.join()
|
|
|
|
e = self.assertRaises(mitogen.core.ChannelError, throw)
|
|
|
|
self.assertEqual(e.args[0], sender.explicit_close_msg)
|
|
|
|
|
|
|
|
@unittest.skip(reason=(
|
|
|
|
'Unclear if a asingle dead message received from remote should '
|
|
|
|
'cause all threads to wake up.'
|
|
|
|
))
|
|
|
|
def test_sender_closes_all_threads(self):
|
|
|
|
latch = mitogen.core.Latch()
|
|
|
|
wait_recv = mitogen.core.Receiver(self.router)
|
|
|
|
ts = [
|
|
|
|
threading.Thread(target=lambda: self.wait(latch, wait_recv))
|
|
|
|
for x in range(5)
|
|
|
|
]
|
|
|
|
for t in ts:
|
|
|
|
t.start()
|
|
|
|
sender = wait_recv.to_sender()
|
|
|
|
sender.close()
|
|
|
|
def throw():
|
|
|
|
raise latch.get()
|
|
|
|
for x in range(5):
|
|
|
|
e = self.assertRaises(mitogen.core.ChannelError, throw)
|
|
|
|
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
|
|
|
|
for t in ts:
|
|
|
|
t.join()
|
|
|
|
|
|
|
|
# TODO: what happens to a Select subscribed to the receiver in this case?
|
|
|
|
|
|
|
|
|
|
|
|
class ToSenderTest(testlib.RouterMixin, testlib.TestCase):
|
|
|
|
klass = mitogen.core.Receiver
|
|
|
|
|
|
|
|
def test_returned_context(self):
|
|
|
|
myself = self.router.myself()
|
|
|
|
recv = self.klass(self.router)
|
|
|
|
self.assertEqual(myself, recv.to_sender().context)
|