From 4b7585df77e593ec6e57d9f55ce1296dc5bc6aaf Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 21 Jan 2026 21:55:37 -0800 Subject: [PATCH] net/udprelay: add tailscaled_peer_relay_endpoints gauge (#18265) New gauge reflects endpoints state via labels: - open, when both peers are connected and ready to talk, and - connecting. when at least one peer hasn't connected yet. Corresponding client metrics are logged as - udprelay_endpoints_connecting - udprelay_endpoints_open Updates tailscale/corp#30820 Change-Id: Idb1baa90a38c97847e14f9b2390093262ad0ea23 Signed-off-by: Alex Valiushko --- net/udprelay/metrics.go | 59 ++++++++++++++++++++++- net/udprelay/metrics_test.go | 57 +++++++++++++++++++++- net/udprelay/server.go | 92 ++++++++++++++++++++++++++++-------- net/udprelay/server_test.go | 74 +++++++++++++++++++++++++++++ 4 files changed, 258 insertions(+), 24 deletions(-) diff --git a/net/udprelay/metrics.go b/net/udprelay/metrics.go index b7c0710c2..235029bf4 100644 --- a/net/udprelay/metrics.go +++ b/net/udprelay/metrics.go @@ -22,6 +22,17 @@ var ( cMetricForwarded46Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp6") cMetricForwarded64Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp4") cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6") + + // cMetricEndpoints is initialized here with no other writes, making it safe for concurrent reads. + // + // [clientmetric.Gauge] does not let us embed existing counters, so + // [metrics.updateEndpoint] records data into client and user gauges independently. + // + // Transitions to and from [endpointClosed] are not recorded. + cMetricEndpoints = map[endpointState]*clientmetric.Metric{ + endpointConnecting: clientmetric.NewGauge("udprelay_endpoints_connecting"), + endpointOpen: clientmetric.NewGauge("udprelay_endpoints_open"), + } ) type transport string @@ -36,6 +47,10 @@ type forwardedLabel struct { transportOut transport `prom:"transport_out"` } +type endpointLabel struct { + state endpointState `prom:"state"` +} + type metrics struct { forwarded44Packets expvar.Int forwarded46Packets expvar.Int @@ -46,6 +61,11 @@ type metrics struct { forwarded46Bytes expvar.Int forwarded64Bytes expvar.Int forwarded66Bytes expvar.Int + + // endpoints are set in [registerMetrics] and safe for concurrent reads. + // + // Transitions to and from [endpointClosed] are not recorded + endpoints map[endpointState]*expvar.Int } // registerMetrics publishes user and client metric counters for peer relay server. @@ -65,6 +85,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics { "counter", "Number of bytes forwarded via Peer Relay", ) + uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel]( + reg, + "tailscaled_peer_relay_endpoints", + "gauge", + "Number of allocated Peer Relay endpoints", + ) forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4} forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6} forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4} @@ -83,6 +109,13 @@ func registerMetrics(reg *usermetric.Registry) *metrics { uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes) uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes) + m.endpoints = map[endpointState]*expvar.Int{ + endpointConnecting: {}, + endpointOpen: {}, + } + uMetricEndpoints.Set(endpointLabel{endpointOpen}, m.endpoints[endpointOpen]) + uMetricEndpoints.Set(endpointLabel{endpointConnecting}, m.endpoints[endpointConnecting]) + // Publish client metrics. cMetricForwarded44Packets.Register(&m.forwarded44Packets) cMetricForwarded46Packets.Register(&m.forwarded46Packets) @@ -96,6 +129,26 @@ func registerMetrics(reg *usermetric.Registry) *metrics { return m } +type endpointUpdater interface { + updateEndpoint(before, after endpointState) +} + +// updateEndpoint updates the endpoints gauge according to states left and entered. +// It records client-metric gauges independently, see [cMetricEndpoints] doc. +func (m *metrics) updateEndpoint(before, after endpointState) { + if before == after { + return + } + if uMetricEndpointsBefore, ok := m.endpoints[before]; ok && before != endpointClosed { + uMetricEndpointsBefore.Add(-1) + cMetricEndpoints[before].Add(-1) + } + if uMetricEndpointsAfter, ok := m.endpoints[after]; ok && after != endpointClosed { + uMetricEndpointsAfter.Add(1) + cMetricEndpoints[after].Add(1) + } +} + // countForwarded records user and client metrics according to the // inbound and outbound address families. func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) { @@ -114,8 +167,7 @@ func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) { } } -// deregisterMetrics unregisters the underlying expvar counters -// from clientmetrics. +// deregisterMetrics clears clientmetrics counters and resets gauges to zero. func deregisterMetrics() { cMetricForwarded44Packets.UnregisterAll() cMetricForwarded46Packets.UnregisterAll() @@ -125,4 +177,7 @@ func deregisterMetrics() { cMetricForwarded46Bytes.UnregisterAll() cMetricForwarded64Bytes.UnregisterAll() cMetricForwarded66Bytes.UnregisterAll() + for _, v := range cMetricEndpoints { + v.Set(0) + } } diff --git a/net/udprelay/metrics_test.go b/net/udprelay/metrics_test.go index 5c6a75113..0b7650534 100644 --- a/net/udprelay/metrics_test.go +++ b/net/udprelay/metrics_test.go @@ -4,6 +4,7 @@ package udprelay import ( + "fmt" "slices" "testing" @@ -11,7 +12,7 @@ import ( "tailscale.com/util/usermetric" ) -func TestMetrics(t *testing.T) { +func TestMetricsLifecycle(t *testing.T) { c := qt.New(t) deregisterMetrics() r := &usermetric.Registry{} @@ -22,6 +23,7 @@ func TestMetrics(t *testing.T) { want := []string{ "tailscaled_peer_relay_forwarded_packets_total", "tailscaled_peer_relay_forwarded_bytes_total", + "tailscaled_peer_relay_endpoints", } slices.Sort(have) slices.Sort(want) @@ -51,4 +53,57 @@ func TestMetrics(t *testing.T) { c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(4)) c.Assert(cMetricForwarded66Bytes.Value(), qt.Equals, int64(4)) c.Assert(cMetricForwarded66Packets.Value(), qt.Equals, int64(4)) + + // Validate client metrics deregistration. + m.updateEndpoint(endpointClosed, endpointOpen) + deregisterMetrics() + c.Check(cMetricForwarded44Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded44Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded46Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded46Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded64Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded64Packets.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded66Bytes.Value(), qt.Equals, int64(0)) + c.Check(cMetricForwarded66Packets.Value(), qt.Equals, int64(0)) + for k := range cMetricEndpoints { + c.Check(cMetricEndpoints[k].Value(), qt.Equals, int64(0)) + } +} + +func TestMetricsEndpointTransitions(t *testing.T) { + c := qt.New(t) + var states = []endpointState{ + endpointClosed, + endpointConnecting, + endpointOpen, + } + for _, a := range states { + for _, b := range states { + t.Run(fmt.Sprintf("%s-%s", a, b), func(t *testing.T) { + deregisterMetrics() + r := &usermetric.Registry{} + m := registerMetrics(r) + m.updateEndpoint(a, b) + var wantA, wantB int64 + switch { + case a == b: + wantA, wantB = 0, 0 + case a == endpointClosed: + wantA, wantB = 0, 1 + case b == endpointClosed: + wantA, wantB = -1, 0 + default: + wantA, wantB = -1, 1 + } + if a != endpointClosed { + c.Check(m.endpoints[a].Value(), qt.Equals, wantA) + c.Check(cMetricEndpoints[a].Value(), qt.Equals, wantA) + } + if b != endpointClosed { + c.Check(m.endpoints[b].Value(), qt.Equals, wantB) + c.Check(cMetricEndpoints[b].Value(), qt.Equals, wantB) + } + }) + } + } } diff --git a/net/udprelay/server.go b/net/udprelay/server.go index 2b6d38923..38ee04df9 100644 --- a/net/udprelay/server.go +++ b/net/udprelay/server.go @@ -122,6 +122,7 @@ type serverEndpoint struct { allocatedAt mono.Time mu sync.Mutex // guards the following fields + closed bool // signals that no new data should be accepted inProgressGeneration [2]uint32 // or zero if a handshake has never started, or has just completed boundAddrPorts [2]netip.AddrPort // or zero value if a handshake has never completed for that relay leg lastSeen [2]mono.Time @@ -151,9 +152,15 @@ func blakeMACFromBindMsg(blakeKey [blake2s.Size]byte, src netip.AddrPort, msg di return out, nil } -func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time) (write []byte, to netip.AddrPort) { +func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time, m endpointUpdater) (write []byte, to netip.AddrPort) { e.mu.Lock() defer e.mu.Unlock() + lastState := e.stateLocked() + + if lastState == endpointClosed { + // endpoint was closed in [Server.endpointGC] + return nil, netip.AddrPort{} + } if senderIndex != 0 && senderIndex != 1 { return nil, netip.AddrPort{} @@ -230,6 +237,7 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex if bytes.Equal(mac[:], discoMsg.Challenge[:]) { // Handshake complete. Update the binding for this sender. e.boundAddrPorts[senderIndex] = from + m.updateEndpoint(lastState, e.stateLocked()) e.lastSeen[senderIndex] = now // record last seen as bound time e.inProgressGeneration[senderIndex] = 0 // reset to zero, which indicates there is no in-progress handshake return nil, netip.AddrPort{} @@ -243,7 +251,7 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex } } -func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []byte, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time) (write []byte, to netip.AddrPort) { +func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []byte, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time, m endpointUpdater) (write []byte, to netip.AddrPort) { senderRaw, isDiscoMsg := disco.Source(b) if !isDiscoMsg { // Not a Disco message @@ -274,7 +282,7 @@ func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []by return nil, netip.AddrPort{} } - return e.handleDiscoControlMsg(from, senderIndex, discoMsg, serverDisco, macSecrets, now) + return e.handleDiscoControlMsg(from, senderIndex, discoMsg, serverDisco, macSecrets, now, m) } func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mono.Time) (write []byte, to netip.AddrPort) { @@ -284,6 +292,10 @@ func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mon // not a control packet, but serverEndpoint isn't bound return nil, netip.AddrPort{} } + if e.stateLocked() == endpointClosed { + // endpoint was closed in [Server.endpointGC] + return nil, netip.AddrPort{} + } switch { case from == e.boundAddrPorts[0]: e.lastSeen[0] = now @@ -301,9 +313,21 @@ func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mon } } -func (e *serverEndpoint) isExpired(now mono.Time, bindLifetime, steadyStateLifetime time.Duration) bool { +// maybeExpire checks if the endpoint has expired according to the provided timeouts and sets its closed state accordingly. +// True is returned if the endpoint was expired and closed. +func (e *serverEndpoint) maybeExpire(now mono.Time, bindLifetime, steadyStateLifetime time.Duration, m endpointUpdater) bool { e.mu.Lock() defer e.mu.Unlock() + before := e.stateLocked() + if e.isExpiredLocked(now, bindLifetime, steadyStateLifetime) { + e.closed = true + m.updateEndpoint(before, e.stateLocked()) + return true + } + return false +} + +func (e *serverEndpoint) isExpiredLocked(now mono.Time, bindLifetime, steadyStateLifetime time.Duration) bool { if !e.isBoundLocked() { if now.Sub(e.allocatedAt) > bindLifetime { return true @@ -323,6 +347,31 @@ func (e *serverEndpoint) isBoundLocked() bool { e.boundAddrPorts[1].IsValid() } +// stateLocked returns current endpointState according to the +// peers handshake status. +func (e *serverEndpoint) stateLocked() endpointState { + switch { + case e == nil, e.closed: + return endpointClosed + case e.boundAddrPorts[0].IsValid() && e.boundAddrPorts[1].IsValid(): + return endpointOpen + default: + return endpointConnecting + } +} + +// endpointState canonicalizes endpoint state names, +// see [serverEndpoint.stateLocked]. +// +// Usermetrics can't handle Stringer, must be a string enum. +type endpointState string + +const ( + endpointClosed endpointState = "closed" // unallocated, not tracked in metrics + endpointConnecting endpointState = "connecting" // at least one peer has not completed handshake + endpointOpen endpointState = "open" // ready to forward +) + // NewServer constructs a [Server] listening on port. If port is zero, then // port selection is left up to the host networking stack. If // onlyStaticAddrPorts is true, then dynamic addr:port discovery will be @@ -703,33 +752,33 @@ func (s *Server) Close() error { clear(s.serverEndpointByDisco) s.closed = true s.bus.Close() + deregisterMetrics() }) return nil } +func (s *Server) endpointGC(bindLifetime, steadyStateLifetime time.Duration) { + now := mono.Now() + // TODO: consider performance implications of scanning all endpoints and + // holding s.mu for the duration. Keep it simple (and slow) for now. + s.mu.Lock() + defer s.mu.Unlock() + for k, v := range s.serverEndpointByDisco { + if v.maybeExpire(now, bindLifetime, steadyStateLifetime, s.metrics) { + delete(s.serverEndpointByDisco, k) + s.serverEndpointByVNI.Delete(v.vni) + } + } +} + func (s *Server) endpointGCLoop() { defer s.wg.Done() ticker := time.NewTicker(s.bindLifetime) defer ticker.Stop() - - gc := func() { - now := mono.Now() - // TODO: consider performance implications of scanning all endpoints and - // holding s.mu for the duration. Keep it simple (and slow) for now. - s.mu.Lock() - defer s.mu.Unlock() - for k, v := range s.serverEndpointByDisco { - if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) { - delete(s.serverEndpointByDisco, k) - s.serverEndpointByVNI.Delete(v.vni) - } - } - } - for { select { case <-ticker.C: - gc() + s.endpointGC(s.bindLifetime, s.steadyStateLifetime) case <-s.closeCh: return } @@ -773,7 +822,7 @@ func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to n } msg := b[packet.GeneveFixedHeaderLength:] secrets := s.getMACSecrets(now) - write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now) + write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now, s.metrics) isDataPacket = false return } @@ -1015,6 +1064,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv s.serverEndpointByVNI.Store(e.vni, e) s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString()) + s.metrics.updateEndpoint(endpointClosed, endpointConnecting) return endpoint.ServerEndpoint{ ServerDisco: s.discoPublic, ClientDisco: pair.Get(), diff --git a/net/udprelay/server_test.go b/net/udprelay/server_test.go index 59917e1c6..cb6b05eea 100644 --- a/net/udprelay/server_test.go +++ b/net/udprelay/server_test.go @@ -8,6 +8,7 @@ import ( "crypto/rand" "net" "net/netip" + "sync" "testing" "time" @@ -21,6 +22,7 @@ import ( "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/views" + "tailscale.com/util/mak" "tailscale.com/util/usermetric" ) @@ -471,3 +473,75 @@ func TestServer_maybeRotateMACSecretLocked(t *testing.T) { qt.Assert(t, macSecret, qt.Not(qt.Equals), s.macSecrets.At(1)) qt.Assert(t, s.macSecrets.At(0), qt.Not(qt.Equals), s.macSecrets.At(1)) } + +func TestServer_endpointGC(t *testing.T) { + for _, tc := range []struct { + name string + addrs [2]netip.AddrPort + lastSeen [2]mono.Time + allocatedAt mono.Time + wantRemoved bool + }{ + { + name: "unbound_endpoint_expired", + allocatedAt: mono.Now().Add(-2 * defaultBindLifetime), + wantRemoved: true, + }, + { + name: "unbound_endpoint_kept", + allocatedAt: mono.Now(), + wantRemoved: false, + }, + { + name: "bound_endpoint_expired_a", + addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")}, + lastSeen: [2]mono.Time{mono.Now().Add(-2 * defaultSteadyStateLifetime), mono.Now()}, + wantRemoved: true, + }, + { + name: "bound_endpoint_expired_b", + addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")}, + lastSeen: [2]mono.Time{mono.Now(), mono.Now().Add(-2 * defaultSteadyStateLifetime)}, + wantRemoved: true, + }, + { + name: "bound_endpoint_kept", + addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")}, + lastSeen: [2]mono.Time{mono.Now(), mono.Now()}, + wantRemoved: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + disco1 := key.NewDisco() + disco2 := key.NewDisco() + pair := key.NewSortedPairOfDiscoPublic(disco1.Public(), disco2.Public()) + ep := &serverEndpoint{ + discoPubKeys: pair, + vni: 1, + lastSeen: tc.lastSeen, + boundAddrPorts: tc.addrs, + allocatedAt: tc.allocatedAt, + } + s := &Server{serverEndpointByVNI: sync.Map{}, metrics: &metrics{}} + mak.Set(&s.serverEndpointByDisco, pair, ep) + s.serverEndpointByVNI.Store(ep.vni, ep) + s.endpointGC(defaultBindLifetime, defaultSteadyStateLifetime) + removed := len(s.serverEndpointByDisco) > 0 + if tc.wantRemoved { + if removed { + t.Errorf("expected endpoint to be removed from Server") + } + if !ep.closed { + t.Errorf("expected endpoint to be closed") + } + } else { + if !removed { + t.Errorf("expected endpoint to remain in Server") + } + if ep.closed { + t.Errorf("expected endpoint to remain open") + } + } + }) + } +}