derp: add TCP RTT metric on Linux (#5949)

Periodically poll the TCP RTT metric from all open TCP connections and
update a (bucketed) histogram metric.

Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
Change-Id: I6214902196b05bf7829c9d0ea501ce0e13d984cf
pull/5952/head
Andrew Dunham 2 years ago committed by GitHub
parent a04f1ff9e6
commit 64ea60aaa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -136,7 +136,8 @@ type Server struct {
multiForwarderCreated expvar.Int multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically avgQueueDuration *uint64 // In milliseconds; accessed atomically
tcpRtt metrics.LabelMap // histogram
// verifyClients only accepts client connections to the DERP server if the clientKey is a // verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api. // known peer in the network, as specified by a running tailscaled's client's local api.
@ -312,6 +313,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
watchers: map[*sclient]bool{}, watchers: map[*sclient]bool{},
sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64), avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
keyOfAddr: map[netip.AddrPort]key.NodePublic{}, keyOfAddr: map[netip.AddrPort]key.NodePublic{},
} }
s.initMetacert() s.initMetacert()
@ -713,6 +715,7 @@ func (c *sclient) run(ctx context.Context) error {
var grp errgroup.Group var grp errgroup.Group
sendCtx, cancelSender := context.WithCancel(ctx) sendCtx, cancelSender := context.WithCancel(ctx)
grp.Go(func() error { return c.sendLoop(sendCtx) }) grp.Go(func() error { return c.sendLoop(sendCtx) })
grp.Go(func() error { return c.statsLoop(sendCtx) })
defer func() { defer func() {
cancelSender() cancelSender()
if err := grp.Wait(); err != nil && !c.s.isClosed() { if err := grp.Wait(); err != nil && !c.s.isClosed() {
@ -1699,6 +1702,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() any { m.Set("average_queue_duration_ms", expvar.Func(func() any {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
})) }))
m.Set("counter_tcp_rtt", &s.tcpRtt)
var expvarVersion expvar.String var expvarVersion expvar.String
expvarVersion.Set(version.Long) expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion) m.Set("version", &expvarVersion)

@ -0,0 +1,14 @@
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !linux
// +build !linux
package derp
import "context"
func (c *sclient) statsLoop(ctx context.Context) error {
return nil
}

@ -0,0 +1,95 @@
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package derp
import (
"context"
"crypto/tls"
"net"
"time"
"golang.org/x/sys/unix"
)
func (c *sclient) statsLoop(ctx context.Context) error {
// If we can't get a TCP socket, then we can't send stats.
tcpConn := c.tcpConn()
if tcpConn == nil {
c.s.tcpRtt.Add("non-tcp", 1)
return nil
}
rawConn, err := tcpConn.SyscallConn()
if err != nil {
c.logf("error getting SyscallConn: %v", err)
c.s.tcpRtt.Add("error", 1)
return nil
}
const statsInterval = 10 * time.Second
ticker := time.NewTicker(statsInterval)
defer ticker.Stop()
var (
tcpInfo *unix.TCPInfo
sysErr error
)
statsLoop:
for {
select {
case <-ticker.C:
err = rawConn.Control(func(fd uintptr) {
tcpInfo, sysErr = unix.GetsockoptTCPInfo(int(fd), unix.IPPROTO_TCP, unix.TCP_INFO)
})
if err != nil || sysErr != nil {
continue statsLoop
}
// TODO(andrew): more metrics?
rtt := time.Duration(tcpInfo.Rtt) * time.Microsecond
c.s.tcpRtt.Add(durationToLabel(rtt), 1)
case <-ctx.Done():
return ctx.Err()
}
}
}
// tcpConn attempts to get the underlying *net.TCPConn from this client's
// Conn; if it cannot, then it will return nil.
func (c *sclient) tcpConn() *net.TCPConn {
nc := c.nc
for {
switch v := nc.(type) {
case *net.TCPConn:
return v
case *tls.Conn:
nc = v.NetConn()
default:
return nil
}
}
}
func durationToLabel(dur time.Duration) string {
switch {
case dur <= 10*time.Millisecond:
return "10ms"
case dur <= 20*time.Millisecond:
return "20ms"
case dur <= 50*time.Millisecond:
return "50ms"
case dur <= 100*time.Millisecond:
return "100ms"
case dur <= 150*time.Millisecond:
return "150ms"
case dur <= 250*time.Millisecond:
return "250ms"
case dur <= 500*time.Millisecond:
return "500ms"
default:
return "inf"
}
}
Loading…
Cancel
Save