issue #249: make write_all() and iter_read() use PREFERRED_POLLER.

pull/255/head
David Wilson 8 years ago
parent b6124f8396
commit 9905f6d8b4

@ -356,6 +356,8 @@ def hybrid_tty_create_child(args):
def write_all(fd, s, deadline=None): def write_all(fd, s, deadline=None):
poller = PREFERRED_POLLER()
poller.start_transmit(fd)
timeout = None timeout = None
written = 0 written = 0
@ -365,33 +367,28 @@ def write_all(fd, s, deadline=None):
if timeout == 0: if timeout == 0:
raise mitogen.core.TimeoutError('write timed out') raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout) for fd in poller.poll(timeout):
if not wfds: n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
continue if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n written += n
def iter_read(fds, deadline=None): def iter_read(fds, deadline=None):
fds = list(fds) poller = PREFERRED_POLLER()
for fd in fds:
poller.start_receive(fd)
bits = [] bits = []
timeout = None timeout = None
while poller.readers:
while fds:
if deadline is not None: if deadline is not None:
timeout = max(0, deadline - time.time()) timeout = max(0, deadline - time.time())
if timeout == 0: if timeout == 0:
break break
rfds, _, _ = select.select(fds, [], [], timeout) for fd in poller.poll(timeout):
if not rfds:
continue
for fd in rfds:
s, disconnected = mitogen.core.io_op(os.read, fd, 4096) s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
if disconnected or not s: if disconnected or not s:
IOLOG.debug('iter_read(%r) -> disconnected', fd) IOLOG.debug('iter_read(%r) -> disconnected', fd)
@ -530,7 +527,7 @@ class KqueuePoller(Poller):
self._changelist.append(select.kevent(fd, filters, flags)) self._changelist.append(select.kevent(fd, filters, flags))
def start_receive(self, fd, data=None): def start_receive(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %d)', mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
self, fd, data) self, fd, data)
if fd not in self._rfds: if fd not in self._rfds:
self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
@ -543,7 +540,8 @@ class KqueuePoller(Poller):
del self._rfds[fd] del self._rfds[fd]
def start_transmit(self, fd, data=None): def start_transmit(self, fd, data=None):
mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r)', self, fd, data) mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
self, fd, data)
if fd not in self._wfds: if fd not in self._wfds:
self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
self._wfds[fd] = data or fd self._wfds[fd] = data or fd
@ -641,7 +639,10 @@ POLLER_BY_SYSNAME = {
'FreeBSD': KqueuePoller, 'FreeBSD': KqueuePoller,
'Linux': EpollPoller, 'Linux': EpollPoller,
} }
PREFERRED_POLLER = POLLER_BY_SYSNAME.get(os.uname()[0], mitogen.core.Poller) PREFERRED_POLLER = POLLER_BY_SYSNAME.get(
os.uname()[0],
mitogen.core.Poller,
)
class TtyLogStream(mitogen.core.BasicStream): class TtyLogStream(mitogen.core.BasicStream):

Loading…
Cancel
Save