net/udprelay: add tailscaled_peer_relay_endpoints gauge (#18265)

New gauge reflects endpoints state via labels:
- open, when both peers are connected and ready to talk, and
- connecting. when at least one peer hasn't connected yet.

Corresponding client metrics are logged as
- udprelay_endpoints_connecting
- udprelay_endpoints_open

Updates tailscale/corp#30820

Change-Id: Idb1baa90a38c97847e14f9b2390093262ad0ea23

Signed-off-by: Alex Valiushko <alexvaliushko@tailscale.com>
pull/18191/merge
Alex Valiushko 2 days ago committed by GitHub
parent 6dc0bd834c
commit 4b7585df77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -22,6 +22,17 @@ var (
cMetricForwarded46Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp6")
cMetricForwarded64Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp4")
cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6")
// cMetricEndpoints is initialized here with no other writes, making it safe for concurrent reads.
//
// [clientmetric.Gauge] does not let us embed existing counters, so
// [metrics.updateEndpoint] records data into client and user gauges independently.
//
// Transitions to and from [endpointClosed] are not recorded.
cMetricEndpoints = map[endpointState]*clientmetric.Metric{
endpointConnecting: clientmetric.NewGauge("udprelay_endpoints_connecting"),
endpointOpen: clientmetric.NewGauge("udprelay_endpoints_open"),
}
)
type transport string
@ -36,6 +47,10 @@ type forwardedLabel struct {
transportOut transport `prom:"transport_out"`
}
type endpointLabel struct {
state endpointState `prom:"state"`
}
type metrics struct {
forwarded44Packets expvar.Int
forwarded46Packets expvar.Int
@ -46,6 +61,11 @@ type metrics struct {
forwarded46Bytes expvar.Int
forwarded64Bytes expvar.Int
forwarded66Bytes expvar.Int
// endpoints are set in [registerMetrics] and safe for concurrent reads.
//
// Transitions to and from [endpointClosed] are not recorded
endpoints map[endpointState]*expvar.Int
}
// registerMetrics publishes user and client metric counters for peer relay server.
@ -65,6 +85,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
"counter",
"Number of bytes forwarded via Peer Relay",
)
uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel](
reg,
"tailscaled_peer_relay_endpoints",
"gauge",
"Number of allocated Peer Relay endpoints",
)
forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4}
forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6}
forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4}
@ -83,6 +109,13 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes)
uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes)
m.endpoints = map[endpointState]*expvar.Int{
endpointConnecting: {},
endpointOpen: {},
}
uMetricEndpoints.Set(endpointLabel{endpointOpen}, m.endpoints[endpointOpen])
uMetricEndpoints.Set(endpointLabel{endpointConnecting}, m.endpoints[endpointConnecting])
// Publish client metrics.
cMetricForwarded44Packets.Register(&m.forwarded44Packets)
cMetricForwarded46Packets.Register(&m.forwarded46Packets)
@ -96,6 +129,26 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
return m
}
type endpointUpdater interface {
updateEndpoint(before, after endpointState)
}
// updateEndpoint updates the endpoints gauge according to states left and entered.
// It records client-metric gauges independently, see [cMetricEndpoints] doc.
func (m *metrics) updateEndpoint(before, after endpointState) {
if before == after {
return
}
if uMetricEndpointsBefore, ok := m.endpoints[before]; ok && before != endpointClosed {
uMetricEndpointsBefore.Add(-1)
cMetricEndpoints[before].Add(-1)
}
if uMetricEndpointsAfter, ok := m.endpoints[after]; ok && after != endpointClosed {
uMetricEndpointsAfter.Add(1)
cMetricEndpoints[after].Add(1)
}
}
// countForwarded records user and client metrics according to the
// inbound and outbound address families.
func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) {
@ -114,8 +167,7 @@ func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) {
}
}
// deregisterMetrics unregisters the underlying expvar counters
// from clientmetrics.
// deregisterMetrics clears clientmetrics counters and resets gauges to zero.
func deregisterMetrics() {
cMetricForwarded44Packets.UnregisterAll()
cMetricForwarded46Packets.UnregisterAll()
@ -125,4 +177,7 @@ func deregisterMetrics() {
cMetricForwarded46Bytes.UnregisterAll()
cMetricForwarded64Bytes.UnregisterAll()
cMetricForwarded66Bytes.UnregisterAll()
for _, v := range cMetricEndpoints {
v.Set(0)
}
}

@ -4,6 +4,7 @@
package udprelay
import (
"fmt"
"slices"
"testing"
@ -11,7 +12,7 @@ import (
"tailscale.com/util/usermetric"
)
func TestMetrics(t *testing.T) {
func TestMetricsLifecycle(t *testing.T) {
c := qt.New(t)
deregisterMetrics()
r := &usermetric.Registry{}
@ -22,6 +23,7 @@ func TestMetrics(t *testing.T) {
want := []string{
"tailscaled_peer_relay_forwarded_packets_total",
"tailscaled_peer_relay_forwarded_bytes_total",
"tailscaled_peer_relay_endpoints",
}
slices.Sort(have)
slices.Sort(want)
@ -51,4 +53,57 @@ func TestMetrics(t *testing.T) {
c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(4))
c.Assert(cMetricForwarded66Bytes.Value(), qt.Equals, int64(4))
c.Assert(cMetricForwarded66Packets.Value(), qt.Equals, int64(4))
// Validate client metrics deregistration.
m.updateEndpoint(endpointClosed, endpointOpen)
deregisterMetrics()
c.Check(cMetricForwarded44Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded44Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded46Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded46Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded64Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded64Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded66Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded66Packets.Value(), qt.Equals, int64(0))
for k := range cMetricEndpoints {
c.Check(cMetricEndpoints[k].Value(), qt.Equals, int64(0))
}
}
func TestMetricsEndpointTransitions(t *testing.T) {
c := qt.New(t)
var states = []endpointState{
endpointClosed,
endpointConnecting,
endpointOpen,
}
for _, a := range states {
for _, b := range states {
t.Run(fmt.Sprintf("%s-%s", a, b), func(t *testing.T) {
deregisterMetrics()
r := &usermetric.Registry{}
m := registerMetrics(r)
m.updateEndpoint(a, b)
var wantA, wantB int64
switch {
case a == b:
wantA, wantB = 0, 0
case a == endpointClosed:
wantA, wantB = 0, 1
case b == endpointClosed:
wantA, wantB = -1, 0
default:
wantA, wantB = -1, 1
}
if a != endpointClosed {
c.Check(m.endpoints[a].Value(), qt.Equals, wantA)
c.Check(cMetricEndpoints[a].Value(), qt.Equals, wantA)
}
if b != endpointClosed {
c.Check(m.endpoints[b].Value(), qt.Equals, wantB)
c.Check(cMetricEndpoints[b].Value(), qt.Equals, wantB)
}
})
}
}
}

@ -122,6 +122,7 @@ type serverEndpoint struct {
allocatedAt mono.Time
mu sync.Mutex // guards the following fields
closed bool // signals that no new data should be accepted
inProgressGeneration [2]uint32 // or zero if a handshake has never started, or has just completed
boundAddrPorts [2]netip.AddrPort // or zero value if a handshake has never completed for that relay leg
lastSeen [2]mono.Time
@ -151,9 +152,15 @@ func blakeMACFromBindMsg(blakeKey [blake2s.Size]byte, src netip.AddrPort, msg di
return out, nil
}
func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time) (write []byte, to netip.AddrPort) {
func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex int, discoMsg disco.Message, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time, m endpointUpdater) (write []byte, to netip.AddrPort) {
e.mu.Lock()
defer e.mu.Unlock()
lastState := e.stateLocked()
if lastState == endpointClosed {
// endpoint was closed in [Server.endpointGC]
return nil, netip.AddrPort{}
}
if senderIndex != 0 && senderIndex != 1 {
return nil, netip.AddrPort{}
@ -230,6 +237,7 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
if bytes.Equal(mac[:], discoMsg.Challenge[:]) {
// Handshake complete. Update the binding for this sender.
e.boundAddrPorts[senderIndex] = from
m.updateEndpoint(lastState, e.stateLocked())
e.lastSeen[senderIndex] = now // record last seen as bound time
e.inProgressGeneration[senderIndex] = 0 // reset to zero, which indicates there is no in-progress handshake
return nil, netip.AddrPort{}
@ -243,7 +251,7 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
}
}
func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []byte, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time) (write []byte, to netip.AddrPort) {
func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []byte, serverDisco key.DiscoPublic, macSecrets views.Slice[[blake2s.Size]byte], now mono.Time, m endpointUpdater) (write []byte, to netip.AddrPort) {
senderRaw, isDiscoMsg := disco.Source(b)
if !isDiscoMsg {
// Not a Disco message
@ -274,7 +282,7 @@ func (e *serverEndpoint) handleSealedDiscoControlMsg(from netip.AddrPort, b []by
return nil, netip.AddrPort{}
}
return e.handleDiscoControlMsg(from, senderIndex, discoMsg, serverDisco, macSecrets, now)
return e.handleDiscoControlMsg(from, senderIndex, discoMsg, serverDisco, macSecrets, now, m)
}
func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mono.Time) (write []byte, to netip.AddrPort) {
@ -284,6 +292,10 @@ func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mon
// not a control packet, but serverEndpoint isn't bound
return nil, netip.AddrPort{}
}
if e.stateLocked() == endpointClosed {
// endpoint was closed in [Server.endpointGC]
return nil, netip.AddrPort{}
}
switch {
case from == e.boundAddrPorts[0]:
e.lastSeen[0] = now
@ -301,9 +313,21 @@ func (e *serverEndpoint) handleDataPacket(from netip.AddrPort, b []byte, now mon
}
}
func (e *serverEndpoint) isExpired(now mono.Time, bindLifetime, steadyStateLifetime time.Duration) bool {
// maybeExpire checks if the endpoint has expired according to the provided timeouts and sets its closed state accordingly.
// True is returned if the endpoint was expired and closed.
func (e *serverEndpoint) maybeExpire(now mono.Time, bindLifetime, steadyStateLifetime time.Duration, m endpointUpdater) bool {
e.mu.Lock()
defer e.mu.Unlock()
before := e.stateLocked()
if e.isExpiredLocked(now, bindLifetime, steadyStateLifetime) {
e.closed = true
m.updateEndpoint(before, e.stateLocked())
return true
}
return false
}
func (e *serverEndpoint) isExpiredLocked(now mono.Time, bindLifetime, steadyStateLifetime time.Duration) bool {
if !e.isBoundLocked() {
if now.Sub(e.allocatedAt) > bindLifetime {
return true
@ -323,6 +347,31 @@ func (e *serverEndpoint) isBoundLocked() bool {
e.boundAddrPorts[1].IsValid()
}
// stateLocked returns current endpointState according to the
// peers handshake status.
func (e *serverEndpoint) stateLocked() endpointState {
switch {
case e == nil, e.closed:
return endpointClosed
case e.boundAddrPorts[0].IsValid() && e.boundAddrPorts[1].IsValid():
return endpointOpen
default:
return endpointConnecting
}
}
// endpointState canonicalizes endpoint state names,
// see [serverEndpoint.stateLocked].
//
// Usermetrics can't handle Stringer, must be a string enum.
type endpointState string
const (
endpointClosed endpointState = "closed" // unallocated, not tracked in metrics
endpointConnecting endpointState = "connecting" // at least one peer has not completed handshake
endpointOpen endpointState = "open" // ready to forward
)
// NewServer constructs a [Server] listening on port. If port is zero, then
// port selection is left up to the host networking stack. If
// onlyStaticAddrPorts is true, then dynamic addr:port discovery will be
@ -703,33 +752,33 @@ func (s *Server) Close() error {
clear(s.serverEndpointByDisco)
s.closed = true
s.bus.Close()
deregisterMetrics()
})
return nil
}
func (s *Server) endpointGC(bindLifetime, steadyStateLifetime time.Duration) {
now := mono.Now()
// TODO: consider performance implications of scanning all endpoints and
// holding s.mu for the duration. Keep it simple (and slow) for now.
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range s.serverEndpointByDisco {
if v.maybeExpire(now, bindLifetime, steadyStateLifetime, s.metrics) {
delete(s.serverEndpointByDisco, k)
s.serverEndpointByVNI.Delete(v.vni)
}
}
}
func (s *Server) endpointGCLoop() {
defer s.wg.Done()
ticker := time.NewTicker(s.bindLifetime)
defer ticker.Stop()
gc := func() {
now := mono.Now()
// TODO: consider performance implications of scanning all endpoints and
// holding s.mu for the duration. Keep it simple (and slow) for now.
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range s.serverEndpointByDisco {
if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) {
delete(s.serverEndpointByDisco, k)
s.serverEndpointByVNI.Delete(v.vni)
}
}
}
for {
select {
case <-ticker.C:
gc()
s.endpointGC(s.bindLifetime, s.steadyStateLifetime)
case <-s.closeCh:
return
}
@ -773,7 +822,7 @@ func (s *Server) handlePacket(from netip.AddrPort, b []byte) (write []byte, to n
}
msg := b[packet.GeneveFixedHeaderLength:]
secrets := s.getMACSecrets(now)
write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now)
write, to = e.(*serverEndpoint).handleSealedDiscoControlMsg(from, msg, s.discoPublic, secrets, now, s.metrics)
isDataPacket = false
return
}
@ -1015,6 +1064,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
s.serverEndpointByVNI.Store(e.vni, e)
s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString())
s.metrics.updateEndpoint(endpointClosed, endpointConnecting)
return endpoint.ServerEndpoint{
ServerDisco: s.discoPublic,
ClientDisco: pair.Get(),

@ -8,6 +8,7 @@ import (
"crypto/rand"
"net"
"net/netip"
"sync"
"testing"
"time"
@ -21,6 +22,7 @@ import (
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/views"
"tailscale.com/util/mak"
"tailscale.com/util/usermetric"
)
@ -471,3 +473,75 @@ func TestServer_maybeRotateMACSecretLocked(t *testing.T) {
qt.Assert(t, macSecret, qt.Not(qt.Equals), s.macSecrets.At(1))
qt.Assert(t, s.macSecrets.At(0), qt.Not(qt.Equals), s.macSecrets.At(1))
}
func TestServer_endpointGC(t *testing.T) {
for _, tc := range []struct {
name string
addrs [2]netip.AddrPort
lastSeen [2]mono.Time
allocatedAt mono.Time
wantRemoved bool
}{
{
name: "unbound_endpoint_expired",
allocatedAt: mono.Now().Add(-2 * defaultBindLifetime),
wantRemoved: true,
},
{
name: "unbound_endpoint_kept",
allocatedAt: mono.Now(),
wantRemoved: false,
},
{
name: "bound_endpoint_expired_a",
addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")},
lastSeen: [2]mono.Time{mono.Now().Add(-2 * defaultSteadyStateLifetime), mono.Now()},
wantRemoved: true,
},
{
name: "bound_endpoint_expired_b",
addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")},
lastSeen: [2]mono.Time{mono.Now(), mono.Now().Add(-2 * defaultSteadyStateLifetime)},
wantRemoved: true,
},
{
name: "bound_endpoint_kept",
addrs: [2]netip.AddrPort{netip.MustParseAddrPort("192.0.2.1:1"), netip.MustParseAddrPort("192.0.2.2:1")},
lastSeen: [2]mono.Time{mono.Now(), mono.Now()},
wantRemoved: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
disco1 := key.NewDisco()
disco2 := key.NewDisco()
pair := key.NewSortedPairOfDiscoPublic(disco1.Public(), disco2.Public())
ep := &serverEndpoint{
discoPubKeys: pair,
vni: 1,
lastSeen: tc.lastSeen,
boundAddrPorts: tc.addrs,
allocatedAt: tc.allocatedAt,
}
s := &Server{serverEndpointByVNI: sync.Map{}, metrics: &metrics{}}
mak.Set(&s.serverEndpointByDisco, pair, ep)
s.serverEndpointByVNI.Store(ep.vni, ep)
s.endpointGC(defaultBindLifetime, defaultSteadyStateLifetime)
removed := len(s.serverEndpointByDisco) > 0
if tc.wantRemoved {
if removed {
t.Errorf("expected endpoint to be removed from Server")
}
if !ep.closed {
t.Errorf("expected endpoint to be closed")
}
} else {
if !removed {
t.Errorf("expected endpoint to remain in Server")
}
if ep.closed {
t.Errorf("expected endpoint to remain open")
}
}
})
}
}

Loading…
Cancel
Save