From 31e234eb05bd2f4824316e8ba95d03b8448f2ca0 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Fri, 19 Dec 2025 14:28:56 -0800 Subject: [PATCH] net/udprelay: add tailscaled_peer_relay_endpoints gauge New gauge reflects endpoints state via labels: open, semi-bound, bound. Corresponding client metrics are logged as - udprelay_endpoints_open - udprelay_endpoints_semi_bound - udprelay_endpoints_bound Updates tailscale/corp#30820 Change-Id: Idb1baa90a38c97847e14f9b2390093262ad0ea23 Signed-off-by: Alex Valiushko --- net/udprelay/metrics.go | 52 ++++++++++++++++++++++++++++++++++++ net/udprelay/metrics_test.go | 50 +++++++++++++++++++++++++++++++++- net/udprelay/server.go | 36 +++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 1 deletion(-) diff --git a/net/udprelay/metrics.go b/net/udprelay/metrics.go index b7c0710c2..0f155a62a 100644 --- a/net/udprelay/metrics.go +++ b/net/udprelay/metrics.go @@ -22,6 +22,18 @@ var ( cMetricForwarded46Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp6") cMetricForwarded64Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp4") cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6") + + // cMetricEndpoints is set on startup and safe for concurrent use. + // + // [clientmetric.Gauge] does not let us embed existing counters, + // [metrics.updateEndpoint] records data into client and user gauges independently. + // + // Transitions to and from [endpointClosed] are not recorded + cMetricEndpoints = map[endpointState]*clientmetric.Metric{ + endpointOpen: clientmetric.NewGauge("udprelay_endpoints_open"), + endpointSemiBound: clientmetric.NewGauge("udprelay_endpoints_semi_bound"), + endpointBound: clientmetric.NewGauge("udprelay_endpoints_bound"), + } ) type transport string @@ -36,6 +48,10 @@ type forwardedLabel struct { transportOut transport `prom:"transport_out"` } +type endpointLabel struct { + state endpointState `prom:"state"` +} + type metrics struct { forwarded44Packets expvar.Int forwarded46Packets expvar.Int @@ -46,6 +62,11 @@ type metrics struct { forwarded46Bytes expvar.Int forwarded64Bytes expvar.Int forwarded66Bytes expvar.Int + + // endpoints are set on init and safe for concurrent use. + // + // Transitions to and from [endpointClosed] are not recorded + endpoints map[endpointState]*expvar.Int } // registerMetrics publishes user and client metric counters for peer relay server. @@ -65,6 +86,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics { "counter", "Number of bytes forwarded via Peer Relay", ) + uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel]( + reg, + "tailscaled_peer_relay_endpoints", + "gauge", + "Number of allocated Peer Relay endpoints", + ) forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4} forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6} forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4} @@ -83,6 +110,15 @@ func registerMetrics(reg *usermetric.Registry) *metrics { uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes) uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes) + m.endpoints = map[endpointState]*expvar.Int{ + endpointBound: {}, + endpointSemiBound: {}, + endpointOpen: {}, + } + uMetricEndpoints.Set(endpointLabel{endpointBound}, m.endpoints[endpointBound]) + uMetricEndpoints.Set(endpointLabel{endpointSemiBound}, m.endpoints[endpointSemiBound]) + uMetricEndpoints.Set(endpointLabel{endpointOpen}, m.endpoints[endpointOpen]) + // Publish client metrics. cMetricForwarded44Packets.Register(&m.forwarded44Packets) cMetricForwarded46Packets.Register(&m.forwarded46Packets) @@ -96,6 +132,19 @@ func registerMetrics(reg *usermetric.Registry) *metrics { return m } +// updateEndpoint updates the endpoints gauge according to states left and entered. +// It records client-metric gauges independently, see [cMetricEndpoints] doc. +func (m *metrics) updateEndpoint(leaving, entering endpointState) { + if gauge, ok := m.endpoints[leaving]; ok && leaving != endpointClosed { + gauge.Add(-1) + cMetricEndpoints[leaving].Add(-1) + } + if gauge, ok := m.endpoints[entering]; ok && entering != endpointClosed { + gauge.Add(1) + cMetricEndpoints[entering].Add(1) + } +} + // countForwarded records user and client metrics according to the // inbound and outbound address families. func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) { @@ -125,4 +174,7 @@ func deregisterMetrics() { cMetricForwarded46Bytes.UnregisterAll() cMetricForwarded64Bytes.UnregisterAll() cMetricForwarded66Bytes.UnregisterAll() + for _, v := range cMetricEndpoints { + v.Set(0) + } } diff --git a/net/udprelay/metrics_test.go b/net/udprelay/metrics_test.go index 5c6a75113..8d2496255 100644 --- a/net/udprelay/metrics_test.go +++ b/net/udprelay/metrics_test.go @@ -11,7 +11,7 @@ import ( "tailscale.com/util/usermetric" ) -func TestMetrics(t *testing.T) { +func TestMetricsLifecycle(t *testing.T) { c := qt.New(t) deregisterMetrics() r := &usermetric.Registry{} @@ -22,6 +22,7 @@ func TestMetrics(t *testing.T) { want := []string{ "tailscaled_peer_relay_forwarded_packets_total", "tailscaled_peer_relay_forwarded_bytes_total", + "tailscaled_peer_relay_endpoints", } slices.Sort(have) slices.Sort(want) @@ -51,4 +52,51 @@ func TestMetrics(t *testing.T) { c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(4)) c.Assert(cMetricForwarded66Bytes.Value(), qt.Equals, int64(4)) c.Assert(cMetricForwarded66Packets.Value(), qt.Equals, int64(4)) + + // Validate client metrics deregistration. + m.updateEndpoint(endpointClosed, endpointOpen) + deregisterMetrics() + c.Check(cMetricForwarded44Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded44Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded46Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded46Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded64Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded64Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded66Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded66Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricEndpoints[endpointOpen].Value(), qt.Equals, int64(0)) +} + +func TestMetricsEndpointTransitions(t *testing.T) { + c := qt.New(t) + for _, tc := range []struct { + name string + leaving, entering endpointState + wantOpen int64 + wantSemi int64 + wantBound int64 + }{ + {"closed-open", endpointClosed, endpointOpen, 1, 0, 0}, + {"open-semi_bound", endpointOpen, endpointSemiBound, -1, 1, 0}, + {"semi_bound-bound", endpointSemiBound, endpointBound, 0, -1, 1}, + {"open-closed", endpointOpen, endpointClosed, -1, 0, 0}, + {"semi_bound-closed", endpointSemiBound, endpointClosed, 0, -1, 0}, + {"bound-closed", endpointBound, endpointClosed, 0, 0, -1}, + } { + t.Run(tc.name, func(t *testing.T) { + deregisterMetrics() + r := &usermetric.Registry{} + m := registerMetrics(r) + + m.updateEndpoint(tc.leaving, tc.entering) + c.Check(m.endpoints[endpointOpen].Value(), qt.Equals, tc.wantOpen) + c.Check(m.endpoints[endpointSemiBound].Value(), qt.Equals, tc.wantSemi) + c.Check(m.endpoints[endpointBound].Value(), qt.Equals, tc.wantBound) + + // Verify client metrics match + c.Check(cMetricEndpoints[endpointOpen].Value(), qt.Equals, tc.wantOpen) + c.Check(cMetricEndpoints[endpointSemiBound].Value(), qt.Equals, tc.wantSemi) + c.Check(cMetricEndpoints[endpointBound].Value(), qt.Equals, tc.wantBound) + }) + } } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index acdbf5ad6..8e67855ea 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -115,6 +115,7 @@ type serverEndpoint struct { lamportID uint64 vni uint32 allocatedAt mono.Time + metrics *metrics mu sync.Mutex // guards the following fields inProgressGeneration [2]uint32 // or zero if a handshake has never started, or has just completed @@ -224,9 +225,11 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex // already authenticated via disco. if bytes.Equal(mac[:], discoMsg.Challenge[:]) { // Handshake complete. Update the binding for this sender. + oldState := e.stateLocked() e.boundAddrPorts[senderIndex] = from e.lastSeen[senderIndex] = now // record last seen as bound time e.inProgressGeneration[senderIndex] = 0 // reset to zero, which indicates there is no in-progress handshake + e.metrics.updateEndpoint(oldState, e.stateLocked()) return nil, netip.AddrPort{} } } @@ -318,6 +321,36 @@ func (e *serverEndpoint) isBoundLocked() bool { e.boundAddrPorts[1].IsValid() } +// stateLocked returns current endpointState according to the +// peers handshake status. +func (e *serverEndpoint) stateLocked() endpointState { + if e == nil { + return endpointClosed + } + a, b := e.boundAddrPorts[0].IsValid(), e.boundAddrPorts[1].IsValid() + switch { + case a && b: + return endpointBound + case a || b: + return endpointSemiBound + default: + return endpointOpen + } +} + +// endpointState canonicalizes endpoint state names, +// see [serverEndpoint.stateLocked]. +// +// Usermetrics can't handle Stringer, must be a string enum. +type endpointState string + +const ( + endpointClosed endpointState = "closed" + endpointOpen endpointState = "open" + endpointSemiBound endpointState = "semi_bound" + endpointBound endpointState = "bound" +) + // NewServer constructs a [Server] listening on port. If port is zero, then // port selection is left up to the host networking stack. If // onlyStaticAddrPorts is true, then dynamic addr:port discovery will be @@ -673,6 +706,7 @@ func (s *Server) endpointGCLoop() { defer s.mu.Unlock() for k, v := range s.serverEndpointByDisco { if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) { + s.metrics.updateEndpoint(v.stateLocked(), endpointClosed) delete(s.serverEndpointByDisco, k) s.serverEndpointByVNI.Delete(v.vni) } @@ -960,6 +994,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv lamportID: s.lamportID, allocatedAt: mono.Now(), vni: vni, + metrics: s.metrics, } e.discoSharedSecrets[0] = s.disco.Shared(e.discoPubKeys.Get()[0]) e.discoSharedSecrets[1] = s.disco.Shared(e.discoPubKeys.Get()[1]) @@ -968,6 +1003,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv s.serverEndpointByVNI.Store(e.vni, e) s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString()) + s.metrics.updateEndpoint(endpointClosed, endpointOpen) return endpoint.ServerEndpoint{ ServerDisco: s.discoPublic, ClientDisco: pair.Get(),