issue #260: core: don't defer stream writes.

issue260
David Wilson 6 years ago
parent d1b7c232bf
commit b656969b15

@ -370,6 +370,8 @@ def set_block(fd):
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
_IO_ERRORS = (select.error, OSError, IOError)
def io_op(func, *args): def io_op(func, *args):
"""Wrap `func(*args)` that may raise :class:`select.error`, """Wrap `func(*args)` that may raise :class:`select.error`,
:class:`IOError`, or :class:`OSError`, trapping UNIX error codes relating :class:`IOError`, or :class:`OSError`, trapping UNIX error codes relating
@ -393,7 +395,7 @@ def io_op(func, *args):
while True: while True:
try: try:
return func(*args), False return func(*args), False
except (select.error, OSError, IOError): except _IO_ERRORS:
e = sys.exc_info()[1] e = sys.exc_info()[1]
_vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e.args[0] == errno.EINTR: if e.args[0] == errno.EINTR:
@ -1352,6 +1354,7 @@ class Stream(BasicStream):
self.name = u'default' self.name = u'default'
self.sent_modules = set(['mitogen', 'mitogen.core']) self.sent_modules = set(['mitogen', 'mitogen.core'])
self.construct(**kwargs) self.construct(**kwargs)
self._lock = threading.Lock()
self._input_buf = collections.deque() self._input_buf = collections.deque()
self._output_buf = collections.deque() self._output_buf = collections.deque()
self._input_buf_len = 0 self._input_buf_len = 0
@ -1462,20 +1465,40 @@ class Stream(BasicStream):
if not self._output_buf: if not self._output_buf:
broker._stop_transmit(self) broker._stop_transmit(self)
def _send(self, msg): def send(self, msg):
_vv and IOLOG.debug('%r._send(%r)', self, msg) """Send `data` to `handle`, and tell the broker we have output. May
be called from any thread."""
_vv and IOLOG.debug('%r.send(%r)', self, msg)
pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id,
msg.auth_id, msg.handle, msg.reply_to or 0, msg.auth_id, msg.handle, msg.reply_to or 0,
len(msg.data)) + msg.data len(msg.data)) + msg.data
if not self._output_buf_len: pktlen = len(pkt)
self._router.broker._start_transmit(self)
self._lock.acquire()
try:
if self._output_buf_len:
self._output_buf.append(pkt) self._output_buf.append(pkt)
self._output_buf_len += len(pkt) self._output_buf_len += pktlen
return
def send(self, msg): written = None
"""Send `data` to `handle`, and tell the broker we have output. May try:
be called from any thread.""" written = self.transmit_side.write(pkt)
self._router.broker.defer(self._send, msg) except _IO_ERRORS:
e = sys.exc_info()[1]
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
if not written:
self._output_buf.append(pkt)
self._output_buf_len += pktlen
elif written != pktlen:
self._output_buf.append(buffer(pkt, written))
self._output_buf_len += pktlen - written
broker = self._router.broker
broker.defer(broker._start_transmit, self)
finally:
self._lock.release()
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Override BasicStream behaviour of immediately disconnecting.""" """Override BasicStream behaviour of immediately disconnecting."""
@ -1778,8 +1801,6 @@ class Latch(object):
self._cls_all_sockets.extend((rsock, wsock)) self._cls_all_sockets.extend((rsock, wsock))
return rsock, wsock return rsock, wsock
COOKIE_SIZE = 33
def _make_cookie(self): def _make_cookie(self):
""" """
Return a 33-byte string encoding the ID of the instance and the current Return a 33-byte string encoding the ID of the instance and the current
@ -1787,7 +1808,12 @@ class Latch(object):
the FD, and buggy internal FD sharing. the FD, and buggy internal FD sharing.
""" """
ident = threading.currentThread().ident ident = threading.currentThread().ident
return b(u'%016x-%016x' % (int(id(self)), ident)) return b(
(u'%-8d-%-16x-%-16x' % (os.getpid(), int(id(self)), ident))
.replace(' ', '-')
)
COOKIE_SIZE = len(_make_cookie(None))
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
""" """
@ -2360,7 +2386,7 @@ class Router(object):
in_stream.remote_id, out_stream.remote_id) in_stream.remote_id, out_stream.remote_id)
return return
out_stream._send(msg) out_stream.send(msg)
def route(self, msg): def route(self, msg):
""" """
@ -2372,7 +2398,7 @@ class Router(object):
This may be called from any thread. This may be called from any thread.
""" """
self.broker.defer(self._async_route, msg) self._async_route(msg)
class Broker(object): class Broker(object):
@ -2498,6 +2524,7 @@ class Broker(object):
def _broker_exit(self): def _broker_exit(self):
for _, (side, _) in self.poller.readers + self.poller.writers: for _, (side, _) in self.poller.readers + self.poller.writers:
if side.keep_alive:
LOG.error('_broker_main() force disconnecting %r', side) LOG.error('_broker_main() force disconnecting %r', side)
side.stream.on_disconnect(self) side.stream.on_disconnect(self)

@ -995,7 +995,7 @@ class Stream(mitogen.core.Stream):
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down.""" """Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self) LOG.debug('%r closing CALL_FUNCTION channel', self)
self._send( self.send(
mitogen.core.Message( mitogen.core.Message(
src_id=mitogen.context_id, src_id=mitogen.context_id,
dst_id=self.remote_id, dst_id=self.remote_id,
@ -1962,7 +1962,7 @@ class ModuleForwarder(object):
self, fullname, context_id, stream.remote_id) self, fullname, context_id, stream.remote_id)
self._send_module_and_related(stream, fullname) self._send_module_and_related(stream, fullname)
if stream.remote_id != context_id: if stream.remote_id != context_id:
stream._send( stream.send(
mitogen.core.Message( mitogen.core.Message(
data=msg.data, data=msg.data,
handle=mitogen.core.FORWARD_MODULE, handle=mitogen.core.FORWARD_MODULE,

Loading…
Cancel
Save