core: document/tidy up poller.

Remove duplicate attribute creates in subclasses too.
issue510
David Wilson 6 years ago
parent 7dd46bf44e
commit 3f5774cfd5

@ -15,6 +15,20 @@ Constants
.. autodata:: CHUNK_SIZE .. autodata:: CHUNK_SIZE
Poller Classes
==============
.. currentmodule:: mitogen.core
.. autoclass:: Poller
:members:
.. currentmodule:: mitogen.parent
.. autoclass:: EpollPoller
.. currentmodule:: mitogen.parent
.. autoclass:: KqueuePoller
Latch Class Latch Class
=========== ===========

@ -1670,6 +1670,33 @@ def _unpickle_context(context_id, name, router=None):
class Poller(object): class Poller(object):
"""
A poller manages OS file descriptors the user is waiting to become
available for IO. The :meth:`poll` method blocks the calling thread
until one or more become ready.
Each descriptor has an associated `data` element, which is unique for each
readiness state, and defaults to being the same as the file descriptor. The
:meth:`poll` method yields the data associated with a a descriptor, rather
than the descriptor itself, allowing for concise loops of the form::
p = Poller()
p.start_receive(first_fd, data=handle_first_fd_read)
p.start_transmit(first_fd, data=handle_first_fd_write)
for callback in p.poll():
callback()
Pollers may be modified while :meth:`poll` is yielding results. Removals
are processed immediately, causing pendings event for the descriptor to be
discarded.
The :meth:`close` method must be called when a poller is disacrded to avoid
a resource leak.
Pollers may only be used by one thread at a time.
"""
#: Increments on every poll(). Used to version _rfds and _wfds. #: Increments on every poll(). Used to version _rfds and _wfds.
_generation = 1 _generation = 1
@ -1677,30 +1704,52 @@ class Poller(object):
self._rfds = {} self._rfds = {}
self._wfds = {} self._wfds = {}
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
@property @property
def readers(self): def readers(self):
"""
Return a list of tuples of the form `(fd, data)` for every FD
registered for receive readiness.
"""
return list((fd, data) for fd, (data, gen) in self._rfds.items()) return list((fd, data) for fd, (data, gen) in self._rfds.items())
@property @property
def writers(self): def writers(self):
"""
Return a list of tuples of the form `(fd, data)` for every FD
registered for transmit readiness.
"""
return list((fd, data) for fd, (data, gen) in self._wfds.items()) return list((fd, data) for fd, (data, gen) in self._wfds.items())
def __repr__(self):
return '%s(%#x)' % (type(self).__name__, id(self))
def close(self): def close(self):
pass """
Close any underlying OS resource used by the poller.
"""
def start_receive(self, fd, data=None): def start_receive(self, fd, data=None):
"""
Cause :meth:`poll` to emit `data` when `fd` is readable.
"""
self._rfds[fd] = (data or fd, self._generation) self._rfds[fd] = (data or fd, self._generation)
def stop_receive(self, fd): def stop_receive(self, fd):
"""
Stop emitting readability events for `fd`.
"""
self._rfds.pop(fd, None) self._rfds.pop(fd, None)
def start_transmit(self, fd, data=None): def start_transmit(self, fd, data=None):
"""
Cause :meth:`poll` to emit `data` when `fd` is writeable.
"""
self._wfds[fd] = (data or fd, self._generation) self._wfds[fd] = (data or fd, self._generation)
def stop_transmit(self, fd): def stop_transmit(self, fd):
"""
Stop emitting writability events for `fd`.
"""
self._wfds.pop(fd, None) self._wfds.pop(fd, None)
def _poll(self, timeout): def _poll(self, timeout):
@ -1723,6 +1772,15 @@ class Poller(object):
yield data yield data
def poll(self, timeout=None): def poll(self, timeout=None):
"""
Block the calling thread until one or more FDs are ready for IO.
:param float timeout:
If not :data:`None`, seconds to wait without an event before
returning an empty iterable.
:returns:
Iterable of `data` elements associated with ready FDs.
"""
_vv and IOLOG.debug('%r.poll(%r)', self, timeout) _vv and IOLOG.debug('%r.poll(%r)', self, timeout)
self._generation += 1 self._generation += 1
return self._poll(timeout) return self._poll(timeout)

@ -715,15 +715,18 @@ class CallSpec(object):
class KqueuePoller(mitogen.core.Poller): class KqueuePoller(mitogen.core.Poller):
"""
Poller based on the FreeBSD/Darwin kqueue(2) interface.
"""
_repr = 'KqueuePoller()' _repr = 'KqueuePoller()'
def __init__(self): def __init__(self):
super(KqueuePoller, self).__init__()
self._kqueue = select.kqueue() self._kqueue = select.kqueue()
self._rfds = {}
self._wfds = {}
self._changelist = [] self._changelist = []
def close(self): def close(self):
super(KqueuePoller, self).close()
self._kqueue.close() self._kqueue.close()
def _control(self, fd, filters, flags): def _control(self, fd, filters, flags):
@ -789,15 +792,18 @@ class KqueuePoller(mitogen.core.Poller):
class EpollPoller(mitogen.core.Poller): class EpollPoller(mitogen.core.Poller):
"""
Poller based on the Linux epoll(2) interface.
"""
_repr = 'EpollPoller()' _repr = 'EpollPoller()'
def __init__(self): def __init__(self):
super(EpollPoller, self).__init__()
self._epoll = select.epoll(32) self._epoll = select.epoll(32)
self._registered_fds = set() self._registered_fds = set()
self._rfds = {}
self._wfds = {}
def close(self): def close(self):
super(EpollPoller, self).close()
self._epoll.close() self._epoll.close()
def _control(self, fd): def _control(self, fd):

Loading…
Cancel
Save