derp: only flush writes to clients when we're out of things to write

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
reviewable/pr209/r2
Brad Fitzpatrick 5 years ago
parent 0a25abcdaa
commit 521ad7b0fc

@ -552,39 +552,57 @@ func (c *sclient) sendLoop() error {
keepAliveTick := time.NewTicker(keepAlive + jitter) keepAliveTick := time.NewTicker(keepAlive + jitter)
defer keepAliveTick.Stop() defer keepAliveTick.Stop()
var werr error // last write error
for { for {
if werr != nil {
return werr
}
// First, a non-blocking select (with a default) that
// does as many non-flushing writes as possible.
select { select {
case <-c.done: case <-c.done:
return nil return nil
case msg := <-c.sendQueue:
case msg, ok := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs)
if !ok { continue
return nil case <-keepAliveTick.C:
} werr = c.sendKeepAlive()
if err := c.sendPacket(msg.src, msg.bs); err != nil { continue
return err default:
// Flush any writes from the 3 sends above, or from
// the blocking loop below.
if werr = c.bw.Flush(); werr != nil {
return werr
} }
}
// Then a blocking select with same:
select {
case <-c.done:
return nil
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
case <-keepAliveTick.C: case <-keepAliveTick.C:
if err := c.sendKeepalive(); err != nil { werr = c.sendKeepAlive()
return err
}
} }
} }
} }
func (c *sclient) sendKeepalive() error { func (c *sclient) setWriteDeadline() {
c.nc.SetWriteDeadline(time.Now().Add(writeTimeout)) c.nc.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := writeFrame(c.bw, frameKeepAlive, nil); err != nil { }
return err
} // sendKeepAlive sends a keep-alive frame, without flushing.
return c.bw.Flush() func (c *sclient) sendKeepAlive() error {
c.setWriteDeadline()
return writeFrameHeader(c.bw, frameKeepAlive, 0)
} }
// sendPacket writes contents to the client in a RecvPacket frame. If // sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses // srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function // DERPv2. The bytes of contents are only valid until this function
// returns, do not retain slices. // returns, do not retain slices.
// It does not flush its bufio.Writer.
func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) { func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
defer func() { defer func() {
// Stats update. // Stats update.
@ -600,7 +618,7 @@ func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
} }
}() }()
c.nc.SetWriteDeadline(time.Now().Add(writeTimeout)) c.setWriteDeadline()
withKey := !srcKey.IsZero() withKey := !srcKey.IsZero()
pktLen := len(contents) pktLen := len(contents)
@ -615,10 +633,8 @@ func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
return err return err
} }
} }
if _, err = c.bw.Write(contents); err != nil { _, err = c.bw.Write(contents)
return err return err
}
return c.bw.Flush()
} }
func (s *Server) expVarFunc(f func() interface{}) expvar.Func { func (s *Server) expVarFunc(f func() interface{}) expvar.Func {

Loading…
Cancel
Save