pull/18265/merge
Alex Valiushko 3 days ago committed by GitHub
commit 6b1d18304c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -22,6 +22,18 @@ var (
cMetricForwarded46Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp4_udp6")
cMetricForwarded64Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp4")
cMetricForwarded66Bytes = clientmetric.NewAggregateCounter("udprelay_forwarded_bytes_udp6_udp6")
// cMetricEndpoints is set on startup and safe for concurrent use.
//
// [clientmetric.Gauge] does not let us embed existing counters,
// [metrics.updateEndpoint] records data into client and user gauges independently.
//
// Transitions to and from [endpointClosed] are not recorded
cMetricEndpoints = map[endpointState]*clientmetric.Metric{
endpointOpen: clientmetric.NewGauge("udprelay_endpoints_open"),
endpointSemiBound: clientmetric.NewGauge("udprelay_endpoints_semi_bound"),
endpointBound: clientmetric.NewGauge("udprelay_endpoints_bound"),
}
)
type transport string
@ -36,6 +48,10 @@ type forwardedLabel struct {
transportOut transport `prom:"transport_out"`
}
type endpointLabel struct {
state endpointState `prom:"state"`
}
type metrics struct {
forwarded44Packets expvar.Int
forwarded46Packets expvar.Int
@ -46,6 +62,11 @@ type metrics struct {
forwarded46Bytes expvar.Int
forwarded64Bytes expvar.Int
forwarded66Bytes expvar.Int
// endpoints are set on init and safe for concurrent use.
//
// Transitions to and from [endpointClosed] are not recorded
endpoints map[endpointState]*expvar.Int
}
// registerMetrics publishes user and client metric counters for peer relay server.
@ -65,6 +86,12 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
"counter",
"Number of bytes forwarded via Peer Relay",
)
uMetricEndpoints = usermetric.NewMultiLabelMapWithRegistry[endpointLabel](
reg,
"tailscaled_peer_relay_endpoints",
"gauge",
"Number of allocated Peer Relay endpoints",
)
forwarded44 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP4}
forwarded46 = forwardedLabel{transportIn: transportUDP4, transportOut: transportUDP6}
forwarded64 = forwardedLabel{transportIn: transportUDP6, transportOut: transportUDP4}
@ -83,6 +110,15 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
uMetricForwardedBytes.Set(forwarded64, &m.forwarded64Bytes)
uMetricForwardedBytes.Set(forwarded66, &m.forwarded66Bytes)
m.endpoints = map[endpointState]*expvar.Int{
endpointBound: {},
endpointSemiBound: {},
endpointOpen: {},
}
uMetricEndpoints.Set(endpointLabel{endpointBound}, m.endpoints[endpointBound])
uMetricEndpoints.Set(endpointLabel{endpointSemiBound}, m.endpoints[endpointSemiBound])
uMetricEndpoints.Set(endpointLabel{endpointOpen}, m.endpoints[endpointOpen])
// Publish client metrics.
cMetricForwarded44Packets.Register(&m.forwarded44Packets)
cMetricForwarded46Packets.Register(&m.forwarded46Packets)
@ -96,6 +132,19 @@ func registerMetrics(reg *usermetric.Registry) *metrics {
return m
}
// updateEndpoint updates the endpoints gauge according to states left and entered.
// It records client-metric gauges independently, see [cMetricEndpoints] doc.
func (m *metrics) updateEndpoint(leaving, entering endpointState) {
if gauge, ok := m.endpoints[leaving]; ok && leaving != endpointClosed {
gauge.Add(-1)
cMetricEndpoints[leaving].Add(-1)
}
if gauge, ok := m.endpoints[entering]; ok && entering != endpointClosed {
gauge.Add(1)
cMetricEndpoints[entering].Add(1)
}
}
// countForwarded records user and client metrics according to the
// inbound and outbound address families.
func (m *metrics) countForwarded(in4, out4 bool, bytes, packets int64) {
@ -125,4 +174,7 @@ func deregisterMetrics() {
cMetricForwarded46Bytes.UnregisterAll()
cMetricForwarded64Bytes.UnregisterAll()
cMetricForwarded66Bytes.UnregisterAll()
for _, v := range cMetricEndpoints {
v.Set(0)
}
}

@ -11,7 +11,7 @@ import (
"tailscale.com/util/usermetric"
)
func TestMetrics(t *testing.T) {
func TestMetricsLifecycle(t *testing.T) {
c := qt.New(t)
deregisterMetrics()
r := &usermetric.Registry{}
@ -22,6 +22,7 @@ func TestMetrics(t *testing.T) {
want := []string{
"tailscaled_peer_relay_forwarded_packets_total",
"tailscaled_peer_relay_forwarded_bytes_total",
"tailscaled_peer_relay_endpoints",
}
slices.Sort(have)
slices.Sort(want)
@ -51,4 +52,51 @@ func TestMetrics(t *testing.T) {
c.Assert(m.forwarded66Packets.Value(), qt.Equals, int64(4))
c.Assert(cMetricForwarded66Bytes.Value(), qt.Equals, int64(4))
c.Assert(cMetricForwarded66Packets.Value(), qt.Equals, int64(4))
// Validate client metrics deregistration.
m.updateEndpoint(endpointClosed, endpointOpen)
deregisterMetrics()
c.Check(cMetricForwarded44Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded44Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded46Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded46Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded64Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded64Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded66Bytes.Value(), qt.Equals, int64(0))
c.Check(cMetricForwarded66Packets.Value(), qt.Equals, int64(0))
c.Check(cMetricEndpoints[endpointOpen].Value(), qt.Equals, int64(0))
}
func TestMetricsEndpointTransitions(t *testing.T) {
c := qt.New(t)
for _, tc := range []struct {
name string
leaving, entering endpointState
wantOpen int64
wantSemi int64
wantBound int64
}{
{"closed-open", endpointClosed, endpointOpen, 1, 0, 0},
{"open-semi_bound", endpointOpen, endpointSemiBound, -1, 1, 0},
{"semi_bound-bound", endpointSemiBound, endpointBound, 0, -1, 1},
{"open-closed", endpointOpen, endpointClosed, -1, 0, 0},
{"semi_bound-closed", endpointSemiBound, endpointClosed, 0, -1, 0},
{"bound-closed", endpointBound, endpointClosed, 0, 0, -1},
} {
t.Run(tc.name, func(t *testing.T) {
deregisterMetrics()
r := &usermetric.Registry{}
m := registerMetrics(r)
m.updateEndpoint(tc.leaving, tc.entering)
c.Check(m.endpoints[endpointOpen].Value(), qt.Equals, tc.wantOpen)
c.Check(m.endpoints[endpointSemiBound].Value(), qt.Equals, tc.wantSemi)
c.Check(m.endpoints[endpointBound].Value(), qt.Equals, tc.wantBound)
// Verify client metrics match
c.Check(cMetricEndpoints[endpointOpen].Value(), qt.Equals, tc.wantOpen)
c.Check(cMetricEndpoints[endpointSemiBound].Value(), qt.Equals, tc.wantSemi)
c.Check(cMetricEndpoints[endpointBound].Value(), qt.Equals, tc.wantBound)
})
}
}

@ -118,6 +118,7 @@ type serverEndpoint struct {
lamportID uint64
vni uint32
allocatedAt mono.Time
metrics *metrics
mu sync.Mutex // guards the following fields
inProgressGeneration [2]uint32 // or zero if a handshake has never started, or has just completed
@ -227,9 +228,11 @@ func (e *serverEndpoint) handleDiscoControlMsg(from netip.AddrPort, senderIndex
// already authenticated via disco.
if bytes.Equal(mac[:], discoMsg.Challenge[:]) {
// Handshake complete. Update the binding for this sender.
oldState := e.stateLocked()
e.boundAddrPorts[senderIndex] = from
e.lastSeen[senderIndex] = now // record last seen as bound time
e.inProgressGeneration[senderIndex] = 0 // reset to zero, which indicates there is no in-progress handshake
e.metrics.updateEndpoint(oldState, e.stateLocked())
return nil, netip.AddrPort{}
}
}
@ -321,6 +324,36 @@ func (e *serverEndpoint) isBoundLocked() bool {
e.boundAddrPorts[1].IsValid()
}
// stateLocked returns current endpointState according to the
// peers handshake status.
func (e *serverEndpoint) stateLocked() endpointState {
if e == nil {
return endpointClosed
}
a, b := e.boundAddrPorts[0].IsValid(), e.boundAddrPorts[1].IsValid()
switch {
case a && b:
return endpointBound
case a || b:
return endpointSemiBound
default:
return endpointOpen
}
}
// endpointState canonicalizes endpoint state names,
// see [serverEndpoint.stateLocked].
//
// Usermetrics can't handle Stringer, must be a string enum.
type endpointState string
const (
endpointClosed endpointState = "closed"
endpointOpen endpointState = "open"
endpointSemiBound endpointState = "semi_bound"
endpointBound endpointState = "bound"
)
// NewServer constructs a [Server] listening on port. If port is zero, then
// port selection is left up to the host networking stack. If
// onlyStaticAddrPorts is true, then dynamic addr:port discovery will be
@ -700,6 +733,7 @@ func (s *Server) endpointGCLoop() {
defer s.mu.Unlock()
for k, v := range s.serverEndpointByDisco {
if v.isExpired(now, s.bindLifetime, s.steadyStateLifetime) {
s.metrics.updateEndpoint(v.stateLocked(), endpointClosed)
delete(s.serverEndpointByDisco, k)
s.serverEndpointByVNI.Delete(v.vni)
}
@ -987,6 +1021,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
lamportID: s.lamportID,
allocatedAt: mono.Now(),
vni: vni,
metrics: s.metrics,
}
e.discoSharedSecrets[0] = s.disco.Shared(e.discoPubKeys.Get()[0])
e.discoSharedSecrets[1] = s.disco.Shared(e.discoPubKeys.Get()[1])
@ -995,6 +1030,7 @@ func (s *Server) AllocateEndpoint(discoA, discoB key.DiscoPublic) (endpoint.Serv
s.serverEndpointByVNI.Store(e.vni, e)
s.logf("allocated endpoint vni=%d lamportID=%d disco[0]=%v disco[1]=%v", e.vni, e.lamportID, pair.Get()[0].ShortString(), pair.Get()[1].ShortString())
s.metrics.updateEndpoint(endpointClosed, endpointOpen)
return endpoint.ServerEndpoint{
ServerDisco: s.discoPublic,
ClientDisco: pair.Get(),

Loading…
Cancel
Save