cmd/stunstamp: support probing multiple ports (#12356)

Updates tailscale/corp#20344

Signed-off-by: Jordan Whited <jordan@tailscale.com>
pull/12386/head
Jordan Whited 6 months ago committed by GitHub
parent 1ca323ac65
commit 6e106712f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -39,6 +39,7 @@ type apiResult struct {
Addr string `json:"addr"` Addr string `json:"addr"`
Source int `json:"source"` // timestampSourceUserspace (0) or timestampSourceKernel (1) Source int `json:"source"` // timestampSourceUserspace (0) or timestampSourceKernel (1)
StableConn bool `json:"stableConn"` StableConn bool `json:"stableConn"`
DstPort int `json:"dstPort"`
RttNS *int `json:"rttNS"` RttNS *int `json:"rttNS"`
} }
@ -94,7 +95,7 @@ func (a *api) query(w http.ResponseWriter, r *http.Request) {
return return
} }
sb := sq.Select("at_unix", "region_id", "hostname", "af", "address", "timestamp_source", "stable_conn", "rtt_ns").From("rtt") sb := sq.Select("at_unix", "region_id", "hostname", "af", "address", "timestamp_source", "stable_conn", "dst_port", "rtt_ns").From("rtt")
sb = sb.Where(sq.And{ sb = sb.Where(sq.And{
sq.GtOrEq{"at_unix": from.Unix()}, sq.GtOrEq{"at_unix": from.Unix()},
sq.LtOrEq{"at_unix": to.Unix()}, sq.LtOrEq{"at_unix": to.Unix()},
@ -115,7 +116,7 @@ func (a *api) query(w http.ResponseWriter, r *http.Request) {
result := apiResult{ result := apiResult{
RttNS: &rtt, RttNS: &rtt,
} }
err = rows.Scan(&result.At, &result.RegionID, &result.Hostname, &result.Af, &result.Addr, &result.Source, &result.StableConn, &result.RttNS) err = rows.Scan(&result.At, &result.RegionID, &result.Hostname, &result.Af, &result.Addr, &result.Source, &result.StableConn, &result.DstPort, &result.RttNS)
if err != nil { if err != nil {
http.Error(w, err.Error(), 500) http.Error(w, err.Error(), 500)
return return

@ -23,6 +23,8 @@ import (
"os" "os"
"os/signal" "os/signal"
"slices" "slices"
"strconv"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -43,6 +45,7 @@ var (
flagRetention = flag.Duration("retention", time.Hour*24*7, "sqlite retention period in time.ParseDuration() format") flagRetention = flag.Duration("retention", time.Hour*24*7, "sqlite retention period in time.ParseDuration() format")
flagRemoteWriteURL = flag.String("rw-url", "", "prometheus remote write URL") flagRemoteWriteURL = flag.String("rw-url", "", "prometheus remote write URL")
flagInstance = flag.String("instance", "", "instance label value; defaults to hostname if unspecified") flagInstance = flag.String("instance", "", "instance label value; defaults to hostname if unspecified")
flagDstPorts = flag.String("dst-ports", "", "comma-separated list of destination ports to monitor")
) )
const ( const (
@ -91,6 +94,7 @@ type result struct {
meta nodeMeta meta nodeMeta
timestampSource timestampSource timestampSource timestampSource
connStability connStability connStability connStability
dstPort int
rtt *time.Duration // nil signifies failure, e.g. timeout rtt *time.Duration // nil signifies failure, e.g. timeout
} }
@ -145,17 +149,17 @@ type nodeMeta struct {
type measureFn func(conn io.ReadWriteCloser, dst *net.UDPAddr) (rtt time.Duration, err error) type measureFn func(conn io.ReadWriteCloser, dst *net.UDPAddr) (rtt time.Duration, err error)
func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn) (*time.Duration, error) { func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn, dstPort int) (*time.Duration, error) {
ua := &net.UDPAddr{ ua := &net.UDPAddr{
IP: net.IP(meta.addr.AsSlice()), IP: net.IP(meta.addr.AsSlice()),
Port: 3478, Port: dstPort,
} }
time.Sleep(rand.N(200 * time.Millisecond)) // jitter across tx time.Sleep(rand.N(200 * time.Millisecond)) // jitter across tx
rtt, err := fn(conn, ua) rtt, err := fn(conn, ua)
if err != nil { if err != nil {
if isTemporaryOrTimeoutErr(err) { if isTemporaryOrTimeoutErr(err) {
log.Printf("temp error measuring RTT to %s(%s): %v", meta.hostname, meta.addr, err) log.Printf("temp error measuring RTT to %s(%s): %v", meta.hostname, ua.String(), err)
return nil, nil return nil, nil
} }
} }
@ -218,10 +222,14 @@ func nodeMetaFromDERPMap(dm *tailcfg.DERPMap, nodeMetaByAddr map[netip.Addr]node
return stale, nil return stale, nil
} }
func getStableConns(stableConns map[netip.Addr][2]io.ReadWriteCloser, addr netip.Addr) ([2]io.ReadWriteCloser, error) { func getStableConns(stableConns map[netip.Addr]map[int][2]io.ReadWriteCloser, addr netip.Addr, dstPort int) ([2]io.ReadWriteCloser, error) {
conns, ok := stableConns[addr] conns := [2]io.ReadWriteCloser{}
byDstPort, ok := stableConns[addr]
if ok { if ok {
return conns, nil conns, ok = byDstPort[dstPort]
if ok {
return conns, nil
}
} }
if supportsKernelTS() { if supportsKernelTS() {
kconn, err := getConnKernelTimestamp() kconn, err := getConnKernelTimestamp()
@ -232,10 +240,17 @@ func getStableConns(stableConns map[netip.Addr][2]io.ReadWriteCloser, addr netip
} }
uconn, err := net.ListenUDP("udp", &net.UDPAddr{}) uconn, err := net.ListenUDP("udp", &net.UDPAddr{})
if err != nil { if err != nil {
if supportsKernelTS() {
conns[timestampSourceKernel].Close()
}
return conns, err return conns, err
} }
conns[timestampSourceUserspace] = uconn conns[timestampSourceUserspace] = uconn
stableConns[addr] = conns if byDstPort == nil {
byDstPort = make(map[int][2]io.ReadWriteCloser)
}
byDstPort[dstPort] = conns
stableConns[addr] = byDstPort
return conns, nil return conns, nil
} }
@ -243,7 +258,7 @@ func getStableConns(stableConns map[netip.Addr][2]io.ReadWriteCloser, addr netip
// DERP nodes described by nodeMetaByAddr while using/updating stableConns for // DERP nodes described by nodeMetaByAddr while using/updating stableConns for
// UDP sockets that should be recycled across runs. It returns the results or // UDP sockets that should be recycled across runs. It returns the results or
// an error if one occurs. // an error if one occurs.
func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Addr][2]io.ReadWriteCloser) ([]result, error) { func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Addr]map[int][2]io.ReadWriteCloser, dstPorts []int) ([]result, error) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
results := make([]result, 0) results := make([]result, 0)
resultsCh := make(chan result) resultsCh := make(chan result)
@ -253,9 +268,14 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
at := time.Now() at := time.Now()
addrsToProbe := make(map[netip.Addr]bool) addrsToProbe := make(map[netip.Addr]bool)
doProbe := func(conn io.ReadWriteCloser, meta nodeMeta, source timestampSource) { doProbe := func(conn io.ReadWriteCloser, meta nodeMeta, source timestampSource, dstPort int) {
defer wg.Done() defer wg.Done()
r := result{} r := result{
at: at,
meta: meta,
timestampSource: source,
dstPort: dstPort,
}
if conn == nil { if conn == nil {
var err error var err error
if source == timestampSourceKernel { if source == timestampSourceKernel {
@ -279,7 +299,7 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
if source == timestampSourceKernel { if source == timestampSourceKernel {
fn = measureRTTKernel fn = measureRTTKernel
} }
rtt, err := probe(meta, conn, fn) rtt, err := probe(meta, conn, fn, dstPort)
if err != nil { if err != nil {
select { select {
case <-doneCh: case <-doneCh:
@ -288,9 +308,6 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
return return
} }
} }
r.at = at
r.meta = meta
r.timestampSource = source
r.rtt = rtt r.rtt = rtt
select { select {
case <-doneCh: case <-doneCh:
@ -300,33 +317,37 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
for _, meta := range nodeMetaByAddr { for _, meta := range nodeMetaByAddr {
addrsToProbe[meta.addr] = true addrsToProbe[meta.addr] = true
stable, err := getStableConns(stableConns, meta.addr) for _, port := range dstPorts {
if err != nil { stable, err := getStableConns(stableConns, meta.addr, port)
close(doneCh) if err != nil {
wg.Wait() close(doneCh)
return nil, err wg.Wait()
} return nil, err
}
wg.Add(2)
numProbes += 2
go doProbe(stable[timestampSourceUserspace], meta, timestampSourceUserspace)
go doProbe(nil, meta, timestampSourceUserspace)
if supportsKernelTS() {
wg.Add(2) wg.Add(2)
numProbes += 2 numProbes += 2
go doProbe(stable[timestampSourceKernel], meta, timestampSourceKernel) go doProbe(stable[timestampSourceUserspace], meta, timestampSourceUserspace, port)
go doProbe(nil, meta, timestampSourceKernel) go doProbe(nil, meta, timestampSourceUserspace, port)
if supportsKernelTS() {
wg.Add(2)
numProbes += 2
go doProbe(stable[timestampSourceKernel], meta, timestampSourceKernel, port)
go doProbe(nil, meta, timestampSourceKernel, port)
}
} }
} }
// cleanup conns we no longer need // cleanup conns we no longer need
for k, conns := range stableConns { for k, byDstPort := range stableConns {
if !addrsToProbe[k] { if !addrsToProbe[k] {
if conns[timestampSourceKernel] != nil { for _, conns := range byDstPort {
conns[timestampSourceKernel].Close() if conns[timestampSourceKernel] != nil {
conns[timestampSourceKernel].Close()
}
conns[timestampSourceUserspace].Close()
delete(stableConns, k)
} }
conns[timestampSourceUserspace].Close()
delete(stableConns, k)
} }
} }
@ -352,7 +373,7 @@ const (
stableConn connStability = true stableConn connStability = true
) )
func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, stability connStability) []prompb.Label { func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, stability connStability, dstPort int) []prompb.Label {
addressFamily := "ipv4" addressFamily := "ipv4"
if meta.addr.Is6() { if meta.addr.Is6() {
addressFamily = "ipv6" addressFamily = "ipv6"
@ -382,6 +403,10 @@ func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, st
Name: "hostname", Name: "hostname",
Value: meta.hostname, Value: meta.hostname,
}) })
labels = append(labels, prompb.Label{
Name: "dst_port",
Value: strconv.Itoa(dstPort),
})
labels = append(labels, prompb.Label{ labels = append(labels, prompb.Label{
Name: "__name__", Name: "__name__",
Value: "stunstamp_derp_stun_rtt_ns", Value: "stunstamp_derp_stun_rtt_ns",
@ -406,40 +431,42 @@ const (
staleNaN uint64 = 0x7ff0000000000002 staleNaN uint64 = 0x7ff0000000000002
) )
func staleMarkersFromNodeMeta(stale []nodeMeta, instance string) []prompb.TimeSeries { func staleMarkersFromNodeMeta(stale []nodeMeta, instance string, dstPorts []int) []prompb.TimeSeries {
staleMarkers := make([]prompb.TimeSeries, 0) staleMarkers := make([]prompb.TimeSeries, 0)
now := time.Now() now := time.Now()
for _, s := range stale { for _, s := range stale {
samples := []prompb.Sample{ for _, dstPort := range dstPorts {
{ samples := []prompb.Sample{
Timestamp: now.UnixMilli(), {
Value: math.Float64frombits(staleNaN), Timestamp: now.UnixMilli(),
}, Value: math.Float64frombits(staleNaN),
} },
staleMarkers = append(staleMarkers, prompb.TimeSeries{ }
Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, unstableConn),
Samples: samples,
})
staleMarkers = append(staleMarkers, prompb.TimeSeries{
Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, stableConn),
Samples: samples,
})
if supportsKernelTS() {
staleMarkers = append(staleMarkers, prompb.TimeSeries{ staleMarkers = append(staleMarkers, prompb.TimeSeries{
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, unstableConn), Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, unstableConn, dstPort),
Samples: samples, Samples: samples,
}) })
staleMarkers = append(staleMarkers, prompb.TimeSeries{ staleMarkers = append(staleMarkers, prompb.TimeSeries{
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, stableConn), Labels: timeSeriesLabels(s, instance, timestampSourceUserspace, stableConn, dstPort),
Samples: samples, Samples: samples,
}) })
if supportsKernelTS() {
staleMarkers = append(staleMarkers, prompb.TimeSeries{
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, unstableConn, dstPort),
Samples: samples,
})
staleMarkers = append(staleMarkers, prompb.TimeSeries{
Labels: timeSeriesLabels(s, instance, timestampSourceKernel, stableConn, dstPort),
Samples: samples,
})
}
} }
} }
return staleMarkers return staleMarkers
} }
func resultToPromTimeSeries(r result, instance string) prompb.TimeSeries { func resultToPromTimeSeries(r result, instance string) prompb.TimeSeries {
labels := timeSeriesLabels(r.meta, instance, r.timestampSource, r.connStability) labels := timeSeriesLabels(r.meta, instance, r.timestampSource, r.connStability, r.dstPort)
samples := make([]prompb.Sample, 1) samples := make([]prompb.Sample, 1)
samples[0].Timestamp = r.at.UnixMilli() samples[0].Timestamp = r.at.UnixMilli()
if r.rtt != nil { if r.rtt != nil {
@ -535,6 +562,20 @@ func remoteWriteTimeSeries(client *remoteWriteClient, tsCh chan []prompb.TimeSer
func main() { func main() {
flag.Parse() flag.Parse()
if len(*flagDstPorts) == 0 {
log.Fatal("dst-ports flag is unset")
}
dstPortsSplit := strings.Split(*flagDstPorts, ",")
slices.Sort(dstPortsSplit)
dstPortsSplit = slices.Compact(dstPortsSplit)
dstPorts := make([]int, 0, len(dstPortsSplit))
for _, d := range dstPortsSplit {
i, err := strconv.ParseUint(d, 10, 16)
if err != nil {
log.Fatal("invalid dst-ports")
}
dstPorts = append(dstPorts, int(i))
}
if len(*flagDERPMap) < 1 { if len(*flagDERPMap) < 1 {
log.Fatal("derp-map flag is unset") log.Fatal("derp-map flag is unset")
} }
@ -545,10 +586,10 @@ func main() {
log.Fatalf("interval must be >= %s and <= %s", minInterval, maxBufferDuration) log.Fatalf("interval must be >= %s and <= %s", minInterval, maxBufferDuration)
} }
if *flagRetention < *flagInterval { if *flagRetention < *flagInterval {
log.Fatalf("retention must be >= interval") log.Fatal("retention must be >= interval")
} }
if len(*flagRemoteWriteURL) < 1 { if len(*flagRemoteWriteURL) < 1 {
log.Fatalf("rw-url flag is unset") log.Fatal("rw-url flag is unset")
} }
_, err := url.Parse(*flagRemoteWriteURL) _, err := url.Parse(*flagRemoteWriteURL)
if err != nil { if err != nil {
@ -610,7 +651,7 @@ func main() {
// ~300 data points per-interval w/o ipv6 w/kernel timestamping resulting // ~300 data points per-interval w/o ipv6 w/kernel timestamping resulting
// in ~2.6m rows in 24h w/a 10s probe interval. // in ~2.6m rows in 24h w/a 10s probe interval.
_, err = db.Exec(` _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT, address TEXT, timestamp_source INT, stable_conn INT, rtt_ns INT) CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT, address TEXT, timestamp_source INT, stable_conn INT, dst_port INT, rtt_ns INT)
`) `)
if err != nil { if err != nil {
log.Fatalf("error initializing db: %v", err) log.Fatalf("error initializing db: %v", err)
@ -658,7 +699,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
for _, v := range nodeMetaByAddr { for _, v := range nodeMetaByAddr {
staleMeta = append(staleMeta, v) staleMeta = append(staleMeta, v)
} }
staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance) staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance, dstPorts)
if len(staleMarkers) > 0 { if len(staleMarkers) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
rwc.write(ctx, staleMarkers) rwc.write(ctx, staleMarkers)
@ -676,7 +717,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
// Comparison of stable and unstable 5-tuple results can shed light on // Comparison of stable and unstable 5-tuple results can shed light on
// differences between paths where hashing (multipathing/load balancing) // differences between paths where hashing (multipathing/load balancing)
// comes into play. // comes into play.
stableConns := make(map[netip.Addr][2]io.ReadWriteCloser) stableConns := make(map[netip.Addr]map[int][2]io.ReadWriteCloser)
derpMapTicker := time.NewTicker(time.Minute * 5) derpMapTicker := time.NewTicker(time.Minute * 5)
defer derpMapTicker.Stop() defer derpMapTicker.Stop()
@ -697,7 +738,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
return return
} }
case <-probeTicker.C: case <-probeTicker.C:
results, err := probeNodes(nodeMetaByAddr, stableConns) results, err := probeNodes(nodeMetaByAddr, stableConns, dstPorts)
if err != nil { if err != nil {
log.Printf("unrecoverable error while probing: %v", err) log.Printf("unrecoverable error while probing: %v", err)
shutdown() shutdown()
@ -728,8 +769,8 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
if result.meta.addr.Is6() { if result.meta.addr.Is6() {
af = 6 af = 6
} }
_, err = tx.Exec("INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", _, err = tx.Exec("INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, dst_port, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
result.at.Unix(), result.meta.regionID, result.meta.hostname, af, result.meta.addr.String(), result.timestampSource, result.connStability, result.rtt) result.at.Unix(), result.meta.regionID, result.meta.hostname, af, result.meta.addr.String(), result.timestampSource, result.connStability, result.dstPort, result.rtt)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Printf("error adding result to tx: %v", err) log.Printf("error adding result to tx: %v", err)
@ -749,7 +790,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
log.Printf("error parsing DERP map, continuing with stale map: %v", err) log.Printf("error parsing DERP map, continuing with stale map: %v", err)
continue continue
} }
staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance) staleMarkers := staleMarkersFromNodeMeta(staleMeta, *flagInstance, dstPorts)
if len(staleMarkers) < 1 { if len(staleMarkers) < 1 {
continue continue
} }

@ -66,12 +66,12 @@ func measureRTTKernel(conn io.ReadWriteCloser, dst *net.UDPAddr) (rtt time.Durat
to4 := dst.IP.To4() to4 := dst.IP.To4()
if to4 != nil { if to4 != nil {
to = &unix.SockaddrInet4{ to = &unix.SockaddrInet4{
Port: 3478, Port: dst.Port,
} }
copy(to.(*unix.SockaddrInet4).Addr[:], to4) copy(to.(*unix.SockaddrInet4).Addr[:], to4)
} else { } else {
to = &unix.SockaddrInet6{ to = &unix.SockaddrInet6{
Port: 3478, Port: dst.Port,
} }
copy(to.(*unix.SockaddrInet6).Addr[:], dst.IP) copy(to.(*unix.SockaddrInet6).Addr[:], dst.IP)
} }

Loading…
Cancel
Save