From a18a083c944e3f7963ae81113161f0cd053d348d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 29 Jan 2019 17:11:51 +0000 Subject: [PATCH] issue #260: avoid start_transmit()/on_transmit()/stop_transmit() Previous transmit sequence was: Router._async_route -> Stream._send -> Broker._start_transmit -> Broker.loop -> Stream.on_transmit -> socket.write -> Broker.stop_transmit New sequence, when socket buffer can hold message is: Router._async_route -> Stream._send -> socket.write bench/roundtrip.py Before: 240 usec after: 178 usec Stat before: 5088.276050 task-clock (msec) # 0.997 CPUs utilized 185,568 context-switches # 0.036 M/sec 0 cpu-migrations # 0.000 K/sec 18,923 page-faults # 0.004 M/sec 13,063,871,501 cycles # 2.567 GHz 12,834,579,684 instructions # 0.98 insn per cycle 2,669,820,684 branches # 524.700 M/sec 107,296,033 branch-misses # 4.02% of all branches 5.105018296 seconds time elapsed 2.350970000 seconds user 0.345497000 seconds sys Stat after: 4019.208047 task-clock (msec) # 0.998 CPUs utilized 249,471 context-switches # 0.062 M/sec 0 cpu-migrations # 0.000 K/sec 20,990 page-faults # 0.005 M/sec 10,312,535,979 cycles # 2.566 GHz 11,586,365,995 instructions # 1.12 insn per cycle 2,392,933,370 branches # 595.374 M/sec 75,432,205 branch-misses # 3.15% of all branches 4.028763347 seconds time elapsed 3.367051000 seconds user 0.652962000 seconds sys --- mitogen/core.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mitogen/core.py b/mitogen/core.py index aaaed1ba..7ceba02c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1682,6 +1682,22 @@ class Stream(BasicStream): pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id, msg.src_id, msg.auth_id, msg.handle, msg.reply_to or 0, len(msg.data)) + msg.data + + if not self._output_buf_len: + # Modifying epoll/Kqueue state is expensive, as is needless broker + # loop iterations. Rather than wait for writeability, simply + # attempt to write immediately, and only fall back to + # start_transmit()/on_transmit() if an error occurred or the socket + # buffer was full. + try: + n = self.transmit_side.write(pkt) + if n: + if n == len(pkt): + return + pkt = pkt[n:] + except OSError: + pass + if not self._output_buf_len: self._router.broker._start_transmit(self) self._output_buf.append(pkt)