|
|
|
@ -13,6 +13,12 @@ import mitogen.parent
|
|
|
|
|
|
|
|
|
|
import testlib
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
next
|
|
|
|
|
except NameError:
|
|
|
|
|
# Python 2.4
|
|
|
|
|
from mitogen.core import next
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SockMixin(object):
|
|
|
|
|
def tearDown(self):
|
|
|
|
@ -345,6 +351,22 @@ class FileClosedMixin(PollerMixin, SockMixin):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TtyHangupMixin(PollerMixin):
|
|
|
|
|
def test_tty_hangup_detected(self):
|
|
|
|
|
# bug in initial select.poll() implementation failed to detect POLLHUP.
|
|
|
|
|
master_fd, slave_fd = mitogen.parent.openpty()
|
|
|
|
|
try:
|
|
|
|
|
self.p.start_receive(master_fd)
|
|
|
|
|
self.assertEquals([], list(self.p.poll(0)))
|
|
|
|
|
os.close(slave_fd)
|
|
|
|
|
slave_fd = None
|
|
|
|
|
self.assertEquals([master_fd], list(self.p.poll(0)))
|
|
|
|
|
finally:
|
|
|
|
|
if slave_fd is not None:
|
|
|
|
|
os.close(slave_fd)
|
|
|
|
|
os.close(master_fd)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DistinctDataMixin(PollerMixin, SockMixin):
|
|
|
|
|
# Verify different data is yielded for the same FD according to the event
|
|
|
|
|
# being raised.
|
|
|
|
@ -368,29 +390,39 @@ class AllMixin(ReceiveStateMixin,
|
|
|
|
|
FileClosedMixin,
|
|
|
|
|
DistinctDataMixin,
|
|
|
|
|
PollMixin,
|
|
|
|
|
TtyHangupMixin,
|
|
|
|
|
CloseMixin):
|
|
|
|
|
"""
|
|
|
|
|
Helper to avoid cutpasting mixin names below.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@unittest2.skipIf(condition=not hasattr(select, 'select'),
|
|
|
|
|
reason='select.select() not supported')
|
|
|
|
|
class SelectTest(AllMixin, testlib.TestCase):
|
|
|
|
|
klass = mitogen.core.Poller
|
|
|
|
|
|
|
|
|
|
SelectTest = unittest2.skipIf(
|
|
|
|
|
condition=not hasattr(select, 'select'),
|
|
|
|
|
reason='select.select() not supported'
|
|
|
|
|
)(SelectTest)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@unittest2.skipIf(condition=not hasattr(select, 'kqueue'),
|
|
|
|
|
reason='select.kqueue() not supported')
|
|
|
|
|
class KqueueTest(AllMixin, testlib.TestCase):
|
|
|
|
|
klass = mitogen.parent.KqueuePoller
|
|
|
|
|
|
|
|
|
|
KqueueTest = unittest2.skipIf(
|
|
|
|
|
condition=not hasattr(select, 'kqueue'),
|
|
|
|
|
reason='select.kqueue() not supported'
|
|
|
|
|
)(KqueueTest)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@unittest2.skipIf(condition=not hasattr(select, 'epoll'),
|
|
|
|
|
reason='select.epoll() not supported')
|
|
|
|
|
class EpollTest(AllMixin, testlib.TestCase):
|
|
|
|
|
klass = mitogen.parent.EpollPoller
|
|
|
|
|
|
|
|
|
|
EpollTest = unittest2.skipIf(
|
|
|
|
|
condition=not hasattr(select, 'epoll'),
|
|
|
|
|
reason='select.epoll() not supported'
|
|
|
|
|
)(EpollTest)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest2.main()
|
|
|
|
|