From 40c991f6b85b6a5ff1a4b440650750e95c755f61 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Wed, 25 Sep 2024 17:20:56 +0200 Subject: [PATCH] wgengine: instrument with usermetrics Updates tailscale/corp#22075 Signed-off-by: Kristoffer Dalby --- tsnet/tsnet_test.go | 146 ++++++++++++++++++++- util/clientmetric/clientmetric.go | 51 ++++++++ util/clientmetric/clientmetric_test.go | 49 ++++++++ wgengine/magicsock/derp.go | 6 +- wgengine/magicsock/endpoint.go | 27 ++-- wgengine/magicsock/magicsock.go | 167 +++++++++++++++++++++++-- wgengine/magicsock/magicsock_test.go | 86 +++++++++++++ 7 files changed, 509 insertions(+), 23 deletions(-) diff --git a/tsnet/tsnet_test.go b/tsnet/tsnet_test.go index 255baf618..98c1fd4ab 100644 --- a/tsnet/tsnet_test.go +++ b/tsnet/tsnet_test.go @@ -36,6 +36,7 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "golang.org/x/net/proxy" + "tailscale.com/client/tailscale" "tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/health" "tailscale.com/ipn" @@ -874,6 +875,78 @@ func promMetricLabelsStr(labels []*dto.LabelPair) string { return b.String() } +// sendData sends a given amount of bytes from s1 to s2. +func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error { + l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip))) + defer l.Close() + + // Dial to s1 from s2 + w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip)) + if err != nil { + return err + } + defer w.Close() + + stopReceive := make(chan struct{}) + defer close(stopReceive) + allReceived := make(chan error) + defer close(allReceived) + + go func() { + conn, err := l.Accept() + if err != nil { + allReceived <- err + return + } + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) + + total := 0 + recvStart := time.Now() + for { + got := make([]byte, bytesCount) + n, err := conn.Read(got) + if n != bytesCount { + logf("read %d bytes, want %d", n, bytesCount) + } + + select { + case <-stopReceive: + return + default: + } + + if err != nil { + allReceived <- fmt.Errorf("failed reading packet, %s", err) + return + } + + total += n + logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100)) + if total == bytesCount { + break + } + } + + logf("all received, took: %s", time.Since(recvStart).String()) + allReceived <- nil + }() + + sendStart := time.Now() + w.SetWriteDeadline(time.Now().Add(30 * time.Second)) + if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil { + stopReceive <- struct{}{} + return err + } + + logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount) + err, _ = <-allReceived + if err != nil { + return err + } + + return nil +} + func TestUserMetrics(t *testing.T) { flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420") tstest.ResourceCheck(t) @@ -882,7 +955,7 @@ func TestUserMetrics(t *testing.T) { controlURL, c := startControl(t) s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1") - s2, _, _ := startServer(t, ctx, controlURL, "s2") + s2, s2ip, _ := startServer(t, ctx, controlURL, "s2") s1.lb.EditPrefs(&ipn.MaskedPrefs{ Prefs: ipn.Prefs{ @@ -951,6 +1024,20 @@ func TestUserMetrics(t *testing.T) { return status1.Self.PrimaryRoutes != nil && status1.Self.PrimaryRoutes.Len() == int(wantRoutes)+1 }) + mustDirect(t, t.Logf, lc1, lc2) + + // 10 megabytes + bytesToSend := 10 * 1024 * 1024 + + // This asserts generates some traffic, it is factored out + // of TestUDPConn. + start := time.Now() + err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip) + if err != nil { + t.Fatalf("Failed to send packets: %v", err) + } + t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String()) + ctxLc, cancelLc := context.WithTimeout(context.Background(), 5*time.Second) defer cancelLc() metrics1, err := lc1.UserMetrics(ctxLc) @@ -968,6 +1055,9 @@ func TestUserMetrics(t *testing.T) { t.Fatal(err) } + // Allow the metrics for the bytes sent to be off by 15%. + bytesSentTolerance := 1.15 + t.Logf("Metrics1:\n%s\n", metrics1) // The node is advertising 4 routes: @@ -997,6 +1087,18 @@ func TestUserMetrics(t *testing.T) { t.Errorf("metrics1, tailscaled_primary_routes: got %v, want %v", got, want) } + // Verify that the amount of data recorded in bytes is higher or equal to the + // 10 megabytes sent. + inboundBytes1 := parsedMetrics1[`tailscaled_inbound_bytes_total{path="direct_ipv4"}`] + if inboundBytes1 < float64(bytesToSend) { + t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, inboundBytes1) + } + + // But ensure that it is not too much higher than the 10 megabytes sent. + if inboundBytes1 > float64(bytesToSend)*bytesSentTolerance { + t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, inboundBytes1) + } + metrics2, err := lc2.UserMetrics(ctx) if err != nil { t.Fatal(err) @@ -1033,6 +1135,18 @@ func TestUserMetrics(t *testing.T) { if got, want := parsedMetrics2["tailscaled_primary_routes"], 0.0; got != want { t.Errorf("metrics2, tailscaled_primary_routes: got %v, want %v", got, want) } + + // Verify that the amount of data recorded in bytes is higher or equal than the + // 10 megabytes sent. + outboundBytes2 := parsedMetrics2[`tailscaled_outbound_bytes_total{path="direct_ipv4"}`] + if outboundBytes2 < float64(bytesToSend) { + t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, outboundBytes2) + } + + // But ensure that it is not too much higher than the 10 megabytes sent. + if outboundBytes2 > float64(bytesToSend)*bytesSentTolerance { + t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, outboundBytes2) + } } func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func() bool) { @@ -1044,3 +1158,33 @@ func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func() } t.Fatalf("waiting for condition: %s", msg) } + +// mustDirect ensures there is a direct connection between LocalClient 1 and 2 +func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) { + t.Helper() + lastLog := time.Now().Add(-time.Minute) + // See https://github.com/tailscale/tailscale/issues/654 + // and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline. + for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + status1, err := lc1.Status(ctx) + if err != nil { + continue + } + status2, err := lc2.Status(ctx) + if err != nil { + continue + } + pst := status1.Peer[status2.Self.PublicKey] + if pst.CurAddr != "" { + logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr) + return + } + if now := time.Now(); now.Sub(lastLog) > time.Second { + logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs) + lastLog = now + } + } + t.Error("magicsock did not find a direct path from lc1 to lc2") +} diff --git a/util/clientmetric/clientmetric.go b/util/clientmetric/clientmetric.go index b2d356b60..584a24f73 100644 --- a/util/clientmetric/clientmetric.go +++ b/util/clientmetric/clientmetric.go @@ -9,6 +9,7 @@ import ( "bytes" "encoding/binary" "encoding/hex" + "expvar" "fmt" "io" "sort" @@ -16,6 +17,8 @@ import ( "sync" "sync/atomic" "time" + + "tailscale.com/util/set" ) var ( @@ -223,6 +226,54 @@ func NewGaugeFunc(name string, f func() int64) *Metric { return m } +// AggregateCounter returns a sum of expvar counters registered with it. +type AggregateCounter struct { + mu sync.RWMutex + counters set.Set[*expvar.Int] +} + +func (c *AggregateCounter) Value() int64 { + c.mu.RLock() + defer c.mu.RUnlock() + var sum int64 + for cnt := range c.counters { + sum += cnt.Value() + } + return sum +} + +// Register registers provided expvar counter. +// When a counter is added to the counter, it will be reset +// to start counting from 0. This is to avoid incrementing the +// counter with an unexpectedly large value. +func (c *AggregateCounter) Register(counter *expvar.Int) { + c.mu.Lock() + defer c.mu.Unlock() + // No need to do anything if it's already registered. + if c.counters.Contains(counter) { + return + } + counter.Set(0) + c.counters.Add(counter) +} + +// UnregisterAll unregisters all counters resulting in it +// starting back down at zero. This is to ensure monotonicity +// and respect the semantics of the counter. +func (c *AggregateCounter) UnregisterAll() { + c.mu.Lock() + defer c.mu.Unlock() + c.counters = set.Set[*expvar.Int]{} +} + +// NewAggregateCounter returns a new aggregate counter that returns +// a sum of expvar variables registered with it. +func NewAggregateCounter(name string) *AggregateCounter { + c := &AggregateCounter{counters: set.Set[*expvar.Int]{}} + NewGaugeFunc(name, c.Value) + return c +} + // WritePrometheusExpositionFormat writes all client metrics to w in // the Prometheus text-based exposition format. // diff --git a/util/clientmetric/clientmetric_test.go b/util/clientmetric/clientmetric_test.go index ab6c4335a..555d7a711 100644 --- a/util/clientmetric/clientmetric_test.go +++ b/util/clientmetric/clientmetric_test.go @@ -4,8 +4,11 @@ package clientmetric import ( + "expvar" "testing" "time" + + qt "github.com/frankban/quicktest" ) func TestDeltaEncBuf(t *testing.T) { @@ -107,3 +110,49 @@ func TestWithFunc(t *testing.T) { t.Errorf("second = %q; want %q", got, want) } } + +func TestAggregateCounter(t *testing.T) { + clearMetrics() + + c := qt.New(t) + + expv1 := &expvar.Int{} + expv2 := &expvar.Int{} + expv3 := &expvar.Int{} + + aggCounter := NewAggregateCounter("agg_counter") + + aggCounter.Register(expv1) + c.Assert(aggCounter.Value(), qt.Equals, int64(0)) + + expv1.Add(1) + c.Assert(aggCounter.Value(), qt.Equals, int64(1)) + + aggCounter.Register(expv2) + c.Assert(aggCounter.Value(), qt.Equals, int64(1)) + + expv1.Add(1) + expv2.Add(1) + c.Assert(aggCounter.Value(), qt.Equals, int64(3)) + + // Adding a new expvar should not change the value + // and any value the counter already had is reset + expv3.Set(5) + aggCounter.Register(expv3) + c.Assert(aggCounter.Value(), qt.Equals, int64(3)) + + // Registering the same expvar multiple times should not change the value + aggCounter.Register(expv3) + c.Assert(aggCounter.Value(), qt.Equals, int64(3)) + + aggCounter.UnregisterAll() + c.Assert(aggCounter.Value(), qt.Equals, int64(0)) + + // Start over + expv3.Set(5) + aggCounter.Register(expv3) + c.Assert(aggCounter.Value(), qt.Equals, int64(0)) + + expv3.Set(5) + c.Assert(aggCounter.Value(), qt.Equals, int64(5)) +} diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index 69c5cbc90..281447ac2 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -669,7 +669,8 @@ func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan c.logf("magicsock: derp.Send(%v): %v", wr.addr, err) metricSendDERPError.Add(1) } else { - metricSendDERP.Add(1) + c.metrics.outboundPacketsDERPTotal.Add(1) + c.metrics.outboundBytesDERPTotal.Add(int64(len(wr.b))) } } } @@ -690,7 +691,8 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) // No data read occurred. Wait for another packet. continue } - metricRecvDataDERP.Add(1) + c.metrics.inboundPacketsDERPTotal.Add(1) + c.metrics.inboundBytesDERPTotal.Add(int64(n)) sizes[0] = n eps[0] = ep return 1, nil diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 53ecb84de..78b9ee92a 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -960,26 +960,39 @@ func (de *endpoint) send(buffs [][]byte) error { de.noteBadEndpoint(udpAddr) } + var txBytes int + for _, b := range buffs { + txBytes += len(b) + } + + switch { + case udpAddr.Addr().Is4(): + de.c.metrics.outboundPacketsIPv4Total.Add(int64(len(buffs))) + de.c.metrics.outboundBytesIPv4Total.Add(int64(txBytes)) + case udpAddr.Addr().Is6(): + de.c.metrics.outboundPacketsIPv6Total.Add(int64(len(buffs))) + de.c.metrics.outboundBytesIPv6Total.Add(int64(txBytes)) + } + // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. if stats := de.c.stats.Load(); err == nil && stats != nil { - var txBytes int - for _, b := range buffs { - txBytes += len(b) - } stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes) } } if derpAddr.IsValid() { allOk := true + var txBytes int for _, buff := range buffs { ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff) - if stats := de.c.stats.Load(); stats != nil { - stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff)) - } + txBytes += len(buff) if !ok { allOk = false } } + + if stats := de.c.stats.Load(); stats != nil { + stats.UpdateTxPhysical(de.nodeAddr, derpAddr, txBytes) + } if allOk { return nil } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 08aff842d..2d4944baf 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -10,6 +10,7 @@ import ( "bytes" "context" "errors" + "expvar" "fmt" "io" "net" @@ -80,6 +81,54 @@ const ( socketBufferSize = 7 << 20 ) +// Path is a label indicating the type of path a packet took. +type Path string + +const ( + PathDirectIPv4 Path = "direct_ipv4" + PathDirectIPv6 Path = "direct_ipv6" + PathDERP Path = "derp" +) + +type pathLabel struct { + // Path indicates the path that the packet took: + // - direct_ipv4 + // - direct_ipv6 + // - derp + Path Path +} + +// metrics in wgengine contains the usermetrics counters for magicsock, it +// is however a bit special. All them metrics are labeled, but looking up +// the metric everytime we need to record it has an overhead, and includes +// a lock in MultiLabelMap. The metrics are therefore instead created with +// wgengine and the underlying expvar.Int is stored to be used directly. +type metrics struct { + // inboundPacketsTotal is the total number of inbound packets received, + // labeled by the path the packet took. + inboundPacketsIPv4Total expvar.Int + inboundPacketsIPv6Total expvar.Int + inboundPacketsDERPTotal expvar.Int + + // inboundBytesTotal is the total number of inbound bytes received, + // labeled by the path the packet took. + inboundBytesIPv4Total expvar.Int + inboundBytesIPv6Total expvar.Int + inboundBytesDERPTotal expvar.Int + + // outboundPacketsTotal is the total number of outbound packets sent, + // labeled by the path the packet took. + outboundPacketsIPv4Total expvar.Int + outboundPacketsIPv6Total expvar.Int + outboundPacketsDERPTotal expvar.Int + + // outboundBytesTotal is the total number of outbound bytes sent, + // labeled by the path the packet took. + outboundBytesIPv4Total expvar.Int + outboundBytesIPv6Total expvar.Int + outboundBytesDERPTotal expvar.Int +} + // A Conn routes UDP packets and actively manages a list of its endpoints. type Conn struct { // This block mirrors the contents and field order of the Options @@ -321,6 +370,9 @@ type Conn struct { // responsibility to ensure that traffic from these endpoints is routed // to the node. staticEndpoints views.Slice[netip.AddrPort] + + // metrics contains the metrics for the magicsock instance. + metrics *metrics } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -503,6 +555,8 @@ func NewConn(opts Options) (*Conn, error) { UseDNSCache: true, } + c.metrics = registerMetrics(opts.Metrics) + if d4, err := c.listenRawDisco("ip4"); err == nil { c.logf("[v1] using BPF disco receiver for IPv4") c.closeDisco4 = d4 @@ -520,6 +574,76 @@ func NewConn(opts Options) (*Conn, error) { return c, nil } +// registerMetrics wires up the metrics for wgengine, instead of +// registering the label metric directly, the underlying expvar is exposed. +// See metrics for more info. +func registerMetrics(reg *usermetric.Registry) *metrics { + pathDirectV4 := pathLabel{Path: PathDirectIPv4} + pathDirectV6 := pathLabel{Path: PathDirectIPv6} + pathDERP := pathLabel{Path: PathDERP} + inboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + reg, + "tailscaled_inbound_packets_total", + "counter", + "Counts the number of packets received from other peers", + ) + inboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + reg, + "tailscaled_inbound_bytes_total", + "counter", + "Counts the number of bytes received from other peers", + ) + outboundPacketsTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + reg, + "tailscaled_outbound_packets_total", + "counter", + "Counts the number of packets sent to other peers", + ) + outboundBytesTotal := usermetric.NewMultiLabelMapWithRegistry[pathLabel]( + reg, + "tailscaled_outbound_bytes_total", + "counter", + "Counts the number of bytes sent to other peers", + ) + m := new(metrics) + + // Map clientmetrics to the usermetric counters. + metricRecvDataPacketsIPv4.Register(&m.inboundPacketsIPv4Total) + metricRecvDataPacketsIPv6.Register(&m.inboundPacketsIPv6Total) + metricRecvDataPacketsDERP.Register(&m.inboundPacketsDERPTotal) + metricSendUDP.Register(&m.outboundPacketsIPv4Total) + metricSendUDP.Register(&m.outboundPacketsIPv6Total) + metricSendDERP.Register(&m.outboundPacketsDERPTotal) + + inboundPacketsTotal.Set(pathDirectV4, &m.inboundPacketsIPv4Total) + inboundPacketsTotal.Set(pathDirectV6, &m.inboundPacketsIPv6Total) + inboundPacketsTotal.Set(pathDERP, &m.inboundPacketsDERPTotal) + + inboundBytesTotal.Set(pathDirectV4, &m.inboundBytesIPv4Total) + inboundBytesTotal.Set(pathDirectV6, &m.inboundBytesIPv6Total) + inboundBytesTotal.Set(pathDERP, &m.inboundBytesDERPTotal) + + outboundPacketsTotal.Set(pathDirectV4, &m.outboundPacketsIPv4Total) + outboundPacketsTotal.Set(pathDirectV6, &m.outboundPacketsIPv6Total) + outboundPacketsTotal.Set(pathDERP, &m.outboundPacketsDERPTotal) + + outboundBytesTotal.Set(pathDirectV4, &m.outboundBytesIPv4Total) + outboundBytesTotal.Set(pathDirectV6, &m.outboundBytesIPv6Total) + outboundBytesTotal.Set(pathDERP, &m.outboundBytesDERPTotal) + + return m +} + +// deregisterMetrics unregisters the underlying usermetrics expvar counters +// from clientmetrics. +func deregisterMetrics(m *metrics) { + metricRecvDataPacketsIPv4.UnregisterAll() + metricRecvDataPacketsIPv6.UnregisterAll() + metricRecvDataPacketsDERP.UnregisterAll() + metricSendUDP.UnregisterAll() + metricSendDERP.UnregisterAll() +} + // InstallCaptureHook installs a callback which is called to // log debug information into the pcap stream. This function // can be called with a nil argument to uninstall the capture @@ -1140,7 +1264,14 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) { _ = c.maybeRebindOnError(runtime.GOOS, err) } else { if sent { - metricSendUDP.Add(1) + switch { + case ipp.Addr().Is4(): + c.metrics.outboundPacketsIPv4Total.Add(1) + c.metrics.outboundBytesIPv4Total.Add(int64(len(b))) + case ipp.Addr().Is6(): + c.metrics.outboundPacketsIPv6Total.Add(1) + c.metrics.outboundBytesIPv6Total.Add(int64(len(b))) + } } } return @@ -1278,19 +1409,24 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) { c.receiveBatchPool.Put(batch) } -// receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4. func (c *Conn) receiveIPv4() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4) + return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), + &c.metrics.inboundPacketsIPv4Total, + &c.metrics.inboundBytesIPv4Total, + ) } // receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6. func (c *Conn) receiveIPv6() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6) + return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), + &c.metrics.inboundPacketsIPv6Total, + &c.metrics.inboundBytesIPv6Total, + ) } // mkReceiveFunc creates a ReceiveFunc reading from ruc. -// The provided healthItem and metric are updated if non-nil. -func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc { +// The provided healthItem and metrics are updated if non-nil. +func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetric, bytesMetric *expvar.Int) conn.ReceiveFunc { // epCache caches an IPPort->endpoint for hot flows. var epCache ippEndpointCache @@ -1327,8 +1463,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu } ipp := msg.Addr.(*net.UDPAddr).AddrPort() if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok { - if metric != nil { - metric.Add(1) + if packetMetric != nil { + packetMetric.Add(1) + } + if bytesMetric != nil { + bytesMetric.Add(int64(msg.N)) } eps[i] = ep sizes[i] = msg.N @@ -2377,6 +2516,8 @@ func (c *Conn) Close() error { pinger.Close() } + deregisterMetrics(c.metrics) + return nil } @@ -2930,17 +3071,17 @@ var ( metricSendDERPErrorChan = clientmetric.NewCounter("magicsock_send_derp_error_chan") metricSendDERPErrorClosed = clientmetric.NewCounter("magicsock_send_derp_error_closed") metricSendDERPErrorQueue = clientmetric.NewCounter("magicsock_send_derp_error_queue") - metricSendUDP = clientmetric.NewCounter("magicsock_send_udp") + metricSendUDP = clientmetric.NewAggregateCounter("magicsock_send_udp") metricSendUDPError = clientmetric.NewCounter("magicsock_send_udp_error") - metricSendDERP = clientmetric.NewCounter("magicsock_send_derp") + metricSendDERP = clientmetric.NewAggregateCounter("magicsock_send_derp") metricSendDERPError = clientmetric.NewCounter("magicsock_send_derp_error") // Data packets (non-disco) metricSendData = clientmetric.NewCounter("magicsock_send_data") metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down") - metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp") - metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") - metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") + metricRecvDataPacketsDERP = clientmetric.NewAggregateCounter("magicsock_recv_data_derp") + metricRecvDataPacketsIPv4 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv4") + metricRecvDataPacketsIPv6 = clientmetric.NewAggregateCounter("magicsock_recv_data_ipv6") // Disco packets metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp") diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 6b2d961b9..c1b8eef22 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -28,6 +28,7 @@ import ( "time" "unsafe" + qt "github.com/frankban/quicktest" wgconn "github.com/tailscale/wireguard-go/conn" "github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/tun/tuntest" @@ -1188,6 +1189,91 @@ func testTwoDevicePing(t *testing.T, d *devices) { checkStats(t, m1, m1Conns) checkStats(t, m2, m2Conns) }) + t.Run("compare-metrics-stats", func(t *testing.T) { + setT(t) + defer setT(outerT) + m1.conn.resetMetricsForTest() + m1.stats.TestExtract() + m2.conn.resetMetricsForTest() + m2.stats.TestExtract() + t.Logf("Metrics before: %s\n", m1.metrics.String()) + ping1(t) + ping2(t) + assertConnStatsAndUserMetricsEqual(t, m1) + assertConnStatsAndUserMetricsEqual(t, m2) + t.Logf("Metrics after: %s\n", m1.metrics.String()) + }) +} + +func (c *Conn) resetMetricsForTest() { + c.metrics.inboundBytesIPv4Total.Set(0) + c.metrics.inboundPacketsIPv4Total.Set(0) + c.metrics.outboundBytesIPv4Total.Set(0) + c.metrics.outboundPacketsIPv4Total.Set(0) + c.metrics.inboundBytesIPv6Total.Set(0) + c.metrics.inboundPacketsIPv6Total.Set(0) + c.metrics.outboundBytesIPv6Total.Set(0) + c.metrics.outboundPacketsIPv6Total.Set(0) + c.metrics.inboundBytesDERPTotal.Set(0) + c.metrics.inboundPacketsDERPTotal.Set(0) + c.metrics.outboundBytesDERPTotal.Set(0) + c.metrics.outboundPacketsDERPTotal.Set(0) +} + +func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { + _, phys := ms.stats.TestExtract() + + physIPv4RxBytes := int64(0) + physIPv4TxBytes := int64(0) + physDERPRxBytes := int64(0) + physDERPTxBytes := int64(0) + physIPv4RxPackets := int64(0) + physIPv4TxPackets := int64(0) + physDERPRxPackets := int64(0) + physDERPTxPackets := int64(0) + for conn, count := range phys { + t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String()) + if conn.Dst.String() == "127.3.3.40:1" { + physDERPRxBytes += int64(count.RxBytes) + physDERPTxBytes += int64(count.TxBytes) + physDERPRxPackets += int64(count.RxPackets) + physDERPTxPackets += int64(count.TxPackets) + } else { + physIPv4RxBytes += int64(count.RxBytes) + physIPv4TxBytes += int64(count.TxBytes) + physIPv4RxPackets += int64(count.RxPackets) + physIPv4TxPackets += int64(count.TxPackets) + } + } + + metricIPv4RxBytes := ms.conn.metrics.inboundBytesIPv4Total.Value() + metricIPv4RxPackets := ms.conn.metrics.inboundPacketsIPv4Total.Value() + metricIPv4TxBytes := ms.conn.metrics.outboundBytesIPv4Total.Value() + metricIPv4TxPackets := ms.conn.metrics.outboundPacketsIPv4Total.Value() + + metricDERPRxBytes := ms.conn.metrics.inboundBytesDERPTotal.Value() + metricDERPRxPackets := ms.conn.metrics.inboundPacketsDERPTotal.Value() + metricDERPTxBytes := ms.conn.metrics.outboundBytesDERPTotal.Value() + metricDERPTxPackets := ms.conn.metrics.outboundPacketsDERPTotal.Value() + + c := qt.New(t) + c.Assert(physDERPRxBytes, qt.Equals, metricDERPRxBytes) + c.Assert(physDERPTxBytes, qt.Equals, metricDERPTxBytes) + c.Assert(physIPv4RxBytes, qt.Equals, metricIPv4RxBytes) + c.Assert(physIPv4TxBytes, qt.Equals, metricIPv4TxBytes) + c.Assert(physDERPRxPackets, qt.Equals, metricDERPRxPackets) + c.Assert(physDERPTxPackets, qt.Equals, metricDERPTxPackets) + c.Assert(physIPv4RxPackets, qt.Equals, metricIPv4RxPackets) + c.Assert(physIPv4TxPackets, qt.Equals, metricIPv4TxPackets) + + // Validate that the usermetrics and clientmetrics are in sync + // Note: the clientmetrics are global, this means that when they are registering with the + // wgengine, multiple in-process nodes used by this test will be updating the same metrics. This is why we need to multiply + // the metrics by 2 to get the expected value. + // TODO(kradalby): https://github.com/tailscale/tailscale/issues/13420 + c.Assert(metricSendUDP.Value(), qt.Equals, metricIPv4TxPackets*2) + c.Assert(metricRecvDataPacketsIPv4.Value(), qt.Equals, metricIPv4RxPackets*2) + c.Assert(metricRecvDataPacketsDERP.Value(), qt.Equals, metricDERPRxPackets*2) } func TestDiscoMessage(t *testing.T) {