issue #260: avoid start_transmit()/on_transmit()/stop_transmit()

Previous transmit sequence was:
        Router._async_route -> Stream._send -> Broker._start_transmit ->
        Broker.loop -> Stream.on_transmit -> socket.write ->
        Broker.stop_transmit

New sequence, when socket buffer can hold message is:
        Router._async_route -> Stream._send -> socket.write

bench/roundtrip.py
        Before: 240 usec
        after: 178 usec

Stat before:
       5088.276050      task-clock (msec)         #    0.997 CPUs utilized
           185,568      context-switches          #    0.036 M/sec
                 0      cpu-migrations            #    0.000 K/sec
            18,923      page-faults               #    0.004 M/sec
    13,063,871,501      cycles                    #    2.567 GHz
    12,834,579,684      instructions              #    0.98  insn per cycle
     2,669,820,684      branches                  #  524.700 M/sec
       107,296,033      branch-misses             #    4.02% of all branches

       5.105018296 seconds time elapsed

       2.350970000 seconds user
       0.345497000 seconds sys

Stat after:
       4019.208047      task-clock (msec)         #    0.998 CPUs utilized
           249,471      context-switches          #    0.062 M/sec
                 0      cpu-migrations            #    0.000 K/sec
            20,990      page-faults               #    0.005 M/sec
    10,312,535,979      cycles                    #    2.566 GHz
    11,586,365,995      instructions              #    1.12  insn per cycle
     2,392,933,370      branches                  #  595.374 M/sec
        75,432,205      branch-misses             #    3.15% of all branches

       4.028763347 seconds time elapsed

       3.367051000 seconds user
       0.652962000 seconds sys
issue510
David Wilson 6 years ago
parent d4a0b70e15
commit a18a083c94

@ -1682,6 +1682,22 @@ class Stream(BasicStream):
pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id,
msg.src_id, msg.auth_id, msg.handle,
msg.reply_to or 0, len(msg.data)) + msg.data
if not self._output_buf_len:
# Modifying epoll/Kqueue state is expensive, as is needless broker
# loop iterations. Rather than wait for writeability, simply
# attempt to write immediately, and only fall back to
# start_transmit()/on_transmit() if an error occurred or the socket
# buffer was full.
try:
n = self.transmit_side.write(pkt)
if n:
if n == len(pkt):
return
pkt = pkt[n:]
except OSError:
pass
if not self._output_buf_len:
self._router.broker._start_transmit(self)
self._output_buf.append(pkt)

Loading…
Cancel
Save