From 521ad7b0fcccf9e35f088f3cce4d69c7a1167ae8 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 21 Mar 2020 18:24:28 -0700 Subject: [PATCH] derp: only flush writes to clients when we're out of things to write Signed-off-by: Brad Fitzpatrick --- derp/derp_server.go | 56 +++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/derp/derp_server.go b/derp/derp_server.go index d3d1251ff..35d452773 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -552,39 +552,57 @@ func (c *sclient) sendLoop() error { keepAliveTick := time.NewTicker(keepAlive + jitter) defer keepAliveTick.Stop() + var werr error // last write error for { + if werr != nil { + return werr + } + // First, a non-blocking select (with a default) that + // does as many non-flushing writes as possible. select { case <-c.done: return nil - - case msg, ok := <-c.sendQueue: - if !ok { - return nil - } - if err := c.sendPacket(msg.src, msg.bs); err != nil { - return err + case msg := <-c.sendQueue: + werr = c.sendPacket(msg.src, msg.bs) + continue + case <-keepAliveTick.C: + werr = c.sendKeepAlive() + continue + default: + // Flush any writes from the 3 sends above, or from + // the blocking loop below. + if werr = c.bw.Flush(); werr != nil { + return werr } + } + // Then a blocking select with same: + select { + case <-c.done: + return nil + case msg := <-c.sendQueue: + werr = c.sendPacket(msg.src, msg.bs) case <-keepAliveTick.C: - if err := c.sendKeepalive(); err != nil { - return err - } + werr = c.sendKeepAlive() } } } -func (c *sclient) sendKeepalive() error { +func (c *sclient) setWriteDeadline() { c.nc.SetWriteDeadline(time.Now().Add(writeTimeout)) - if err := writeFrame(c.bw, frameKeepAlive, nil); err != nil { - return err - } - return c.bw.Flush() +} + +// sendKeepAlive sends a keep-alive frame, without flushing. +func (c *sclient) sendKeepAlive() error { + c.setWriteDeadline() + return writeFrameHeader(c.bw, frameKeepAlive, 0) } // sendPacket writes contents to the client in a RecvPacket frame. If // srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses // DERPv2. The bytes of contents are only valid until this function // returns, do not retain slices. +// It does not flush its bufio.Writer. func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) { defer func() { // Stats update. @@ -600,7 +618,7 @@ func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) { } }() - c.nc.SetWriteDeadline(time.Now().Add(writeTimeout)) + c.setWriteDeadline() withKey := !srcKey.IsZero() pktLen := len(contents) @@ -615,10 +633,8 @@ func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) { return err } } - if _, err = c.bw.Write(contents); err != nil { - return err - } - return c.bw.Flush() + _, err = c.bw.Write(contents) + return err } func (s *Server) expVarFunc(f func() interface{}) expvar.Func {