issue #333: closure & data distinctness tests.

issue260
David Wilson 6 years ago
parent 22b4b186d7
commit d5a8293c91

@ -16,7 +16,7 @@ import testlib
class SockMixin(object): class SockMixin(object):
def tearDown(self): def tearDown(self):
self._teardown_socks() self.close_socks()
super(SockMixin, self).tearDown() super(SockMixin, self).tearDown()
def setUp(self): def setUp(self):
@ -62,7 +62,7 @@ class SockMixin(object):
return return
raise raise
def _teardown_socks(self): def close_socks(self):
for sock in self.l1_sock, self.r1_sock, self.l2_sock, self.r2_sock: for sock in self.l1_sock, self.r1_sock, self.l2_sock, self.r2_sock:
sock.close() sock.close()
@ -298,11 +298,75 @@ class MutateDuringYieldMixin(PollerMixin, SockMixin):
self.assertEquals([], list(p)) self.assertEquals([], list(p))
class FileClosedMixin(PollerMixin, SockMixin):
# Verify behaviour when a registered file object is closed in various
# scenarios, without first calling stop_receive()/stop_transmit().
def test_writeable_then_closed(self):
self.p.start_transmit(self.r1)
self.assertEquals([self.r1], list(self.p.poll(0)))
self.close_socks()
try:
self.assertEquals([], list(self.p.poll(0)))
except select.error:
# a crash is also reasonable here.
pass
def test_writeable_closed_before_yield(self):
self.p.start_transmit(self.r1)
p = self.p.poll(0)
self.close_socks()
try:
self.assertEquals([], list(p))
except select.error:
# a crash is also reasonable here.
pass
def test_readable_then_closed(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
self.assertEquals([self.r1], list(self.p.poll(0)))
self.close_socks()
try:
self.assertEquals([], list(self.p.poll(0)))
except select.error:
# a crash is also reasonable here.
pass
def test_readable_closed_before_yield(self):
self.fill(self.l1)
self.p.start_receive(self.r1)
p = self.p.poll(0)
self.close_socks()
try:
self.assertEquals([], list(p))
except select.error:
# a crash is also reasonable here.
pass
class DistinctDataMixin(PollerMixin, SockMixin):
# Verify different data is yielded for the same FD according to the event
# being raised.
def test_one_distinct(self):
rdata = object()
wdata = object()
self.p.start_receive(self.r1, data=rdata)
self.p.start_transmit(self.r1, data=wdata)
self.assertEquals([wdata], list(self.p.poll(0)))
self.fill(self.l1) # r1 is now readable and writeable.
self.assertEquals(set([rdata, wdata]), set(self.p.poll(0)))
class AllMixin(ReceiveStateMixin, class AllMixin(ReceiveStateMixin,
TransmitStateMixin, TransmitStateMixin,
ReadableMixin, ReadableMixin,
WriteableMixin, WriteableMixin,
MutateDuringYieldMixin, MutateDuringYieldMixin,
FileClosedMixin,
DistinctDataMixin,
PollMixin, PollMixin,
CloseMixin): CloseMixin):
""" """

Loading…
Cancel
Save