diff --git a/control/controlknobs/controlknobs.go b/control/controlknobs/controlknobs.go index f9ed69812..6a36c9261 100644 --- a/control/controlknobs/controlknobs.go +++ b/control/controlknobs/controlknobs.go @@ -69,6 +69,10 @@ type Knobs struct { // renewing node keys without breaking connections. // http://go/seamless-key-renewal SeamlessKeyRenewal atomic.Bool + + // ProbeUDPLifetime is whether the node should probe UDP path lifetime on + // the tail end of an active direct connection in magicsock. + ProbeUDPLifetime atomic.Bool } // UpdateFromNodeAttributes updates k (if non-nil) based on the provided self @@ -95,6 +99,7 @@ func (k *Knobs) UpdateFromNodeAttributes(selfNodeAttrs []tailcfg.NodeCapability, forceIPTables = has(tailcfg.NodeAttrLinuxMustUseIPTables) forceNfTables = has(tailcfg.NodeAttrLinuxMustUseNfTables) seamlessKeyRenewal = has(tailcfg.NodeAttrSeamlessKeyRenewal) + probeUDPLifetime = has(tailcfg.NodeAttrProbeUDPLifetime) ) if has(tailcfg.NodeAttrOneCGNATEnable) { @@ -116,6 +121,7 @@ func (k *Knobs) UpdateFromNodeAttributes(selfNodeAttrs []tailcfg.NodeCapability, k.LinuxForceIPTables.Store(forceIPTables) k.LinuxForceNfTables.Store(forceNfTables) k.SeamlessKeyRenewal.Store(seamlessKeyRenewal) + k.ProbeUDPLifetime.Store(probeUDPLifetime) } // AsDebugJSON returns k as something that can be marshalled with json.Marshal @@ -138,5 +144,6 @@ func (k *Knobs) AsDebugJSON() map[string]any { "LinuxForceIPTables": k.LinuxForceIPTables.Load(), "LinuxForceNfTables": k.LinuxForceNfTables.Load(), "SeamlessKeyRenewal": k.SeamlessKeyRenewal.Load(), + "ProbeUDPLifetime": k.ProbeUDPLifetime.Load(), } } diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index cb4926284..1312e37dc 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -4556,6 +4556,7 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) { } b.MagicConn().SetSilentDisco(b.ControlKnobs().SilentDisco.Load()) + b.MagicConn().SetProbeUDPLifetime(b.ControlKnobs().ProbeUDPLifetime.Load()) b.setDebugLogsByCapabilityLocked(nm) diff --git a/tailcfg/tailcfg.go b/tailcfg/tailcfg.go index c299ebbb0..83038bb04 100644 --- a/tailcfg/tailcfg.go +++ b/tailcfg/tailcfg.go @@ -126,7 +126,8 @@ type CapabilityVersion int // - 83: 2023-12-18: Client understands DefaultAutoUpdate // - 84: 2024-01-04: Client understands SeamlessKeyRenewal // - 85: 2024-01-05: Client understands MaxKeyDuration -const CurrentCapabilityVersion CapabilityVersion = 85 +// - 86: 2024-01-23: Client understands NodeAttrProbeUDPLifetime +const CurrentCapabilityVersion CapabilityVersion = 86 type StableID string @@ -2203,6 +2204,10 @@ const ( // NodeAttrSeamlessKeyRenewal makes clients enable beta functionality // of renewing node keys without breaking connections. NodeAttrSeamlessKeyRenewal NodeCapability = "seamless-key-renewal" + + // NodeAttrProbeUDPLifetime makes the client probe UDP path lifetime at the + // tail end of an active direct connection in magicsock. + NodeAttrProbeUDPLifetime NodeCapability = "probe-udp-lifetime" ) // SetDNSRequest is a request to add a DNS record. diff --git a/types/key/disco.go b/types/key/disco.go index 14005b506..1013ce5bf 100644 --- a/types/key/disco.go +++ b/types/key/disco.go @@ -4,6 +4,7 @@ package key import ( + "bytes" "crypto/subtle" "fmt" @@ -127,6 +128,14 @@ func (k DiscoPublic) String() string { return string(bs) } +// Compare returns an integer comparing DiscoPublic k and l lexicographically. +// The result will be 0 if k == l, -1 if k < l, and +1 if k > l. This is useful +// for situations requiring only one node in a pair to perform some operation, +// e.g. probing UDP path lifetime. +func (k DiscoPublic) Compare(l DiscoPublic) int { + return bytes.Compare(k.k[:], l.k[:]) +} + // AppendText implements encoding.TextAppender. func (k DiscoPublic) AppendText(b []byte) ([]byte, error) { return appendHexKey(b, discoPublicHexPrefix, k.k[:]), nil diff --git a/wgengine/magicsock/debughttp.go b/wgengine/magicsock/debughttp.go index f26b50044..4a5531fe0 100644 --- a/wgengine/magicsock/debughttp.go +++ b/wgengine/magicsock/debughttp.go @@ -123,11 +123,11 @@ func (c *Conn) ServeHTTPDebug(w http.ResponseWriter, r *http.Request) { } func printEndpointHTML(w io.Writer, ep *endpoint) { - lastRecv := ep.lastRecv.LoadAtomic() + lastRecv := ep.lastRecvWG.LoadAtomic() ep.mu.Lock() defer ep.mu.Unlock() - if ep.lastSend == 0 && lastRecv == 0 { + if ep.lastSendExt == 0 && lastRecv == 0 { return // no activity ever } @@ -142,7 +142,7 @@ func printEndpointHTML(w io.Writer, ep *endpoint) { fmt.Fprintf(w, "

Best: %+v, %v ago (for %v)

\n", ep.bestAddr, fmtMono(ep.bestAddrAt), ep.trustBestAddrUntil.Sub(mnow).Round(time.Millisecond)) fmt.Fprintf(w, "

heartbeating: %v

\n", ep.heartBeatTimer != nil) - fmt.Fprintf(w, "

lastSend: %v ago

\n", fmtMono(ep.lastSend)) + fmt.Fprintf(w, "

lastSend: %v ago

\n", fmtMono(ep.lastSendExt)) fmt.Fprintf(w, "

lastFullPing: %v ago

\n", fmtMono(ep.lastFullPing)) eps := make([]netip.AddrPort, 0, len(ep.endpointState)) diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index e062c913f..947466d2c 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -26,6 +26,7 @@ import ( "tailscale.com/net/tsaddr" "tailscale.com/syncs" "tailscale.com/tailcfg" + "tailscale.com/tstime/mono" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/util/mak" @@ -668,7 +669,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en return 0, nil } - ep.noteRecvActivity(ipp) + ep.noteRecvActivity(ipp, mono.Now()) if stats := c.stats.Load(); stats != nil { stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) } diff --git a/wgengine/magicsock/discopingpurpose_string.go b/wgengine/magicsock/discopingpurpose_string.go index 6329172dc..3dc327de1 100644 --- a/wgengine/magicsock/discopingpurpose_string.go +++ b/wgengine/magicsock/discopingpurpose_string.go @@ -14,11 +14,12 @@ func _() { _ = x[pingDiscovery-0] _ = x[pingHeartbeat-1] _ = x[pingCLI-2] + _ = x[pingHeartbeatForUDPLifetime-3] } -const _discoPingPurpose_name = "DiscoveryHeartbeatCLI" +const _discoPingPurpose_name = "DiscoveryHeartbeatCLIHeartbeatForUDPLifetime" -var _discoPingPurpose_index = [...]uint8{0, 9, 18, 21} +var _discoPingPurpose_index = [...]uint8{0, 9, 18, 21, 44} func (i discoPingPurpose) String() string { if i < 0 || i >= discoPingPurpose(len(_discoPingPurpose_index)-1) { diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 58267267d..3986f718d 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -15,6 +15,7 @@ import ( "net/netip" "reflect" "runtime" + "slices" "sync" "sync/atomic" "time" @@ -55,7 +56,8 @@ func init() { // to the peer. type endpoint struct { // atomically accessed; declared first for alignment reasons - lastRecv mono.Time + lastRecvWG mono.Time // last time there were incoming packets from this peer destined for wireguard-go (e.g. not disco) + lastRecvUDPAny mono.Time // last time there were incoming UDP packets from this peer of any kind numStopAndResetAtomic int64 debugUpdates *ringbuffer.RingBuffer[EndpointChange] @@ -73,11 +75,12 @@ type endpoint struct { mu sync.Mutex // Lock ordering: Conn.mu, then endpoint.mu heartBeatTimer *time.Timer // nil when idle - lastSend mono.Time // last time there was outgoing packets sent to this peer (from wireguard-go) + lastSendExt mono.Time // last time there were outgoing packets sent to this peer from an external trigger (e.g. wireguard-go or disco pingCLI) + lastSendAny mono.Time // last time there were outgoing packets sent this peer from any trigger, internal or external to magicsock lastFullPing mono.Time // last time we pinged all disco or wireguard only endpoints derpAddr netip.AddrPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients) - bestAddr addrQuality // best non-DERP path; zero if none + bestAddr addrQuality // best non-DERP path; zero if none; mutate via setBestAddrLocked() bestAddrAt mono.Time // time best address re-confirmed trustBestAddrUntil mono.Time // time when bestAddr expires sentPing map[stun.TxID]sentPing @@ -88,11 +91,240 @@ type endpoint struct { // implementation that's a WIP as of 2022-10-20. // See #540 for background. heartbeatDisabled bool + probeUDPLifetime *probeUDPLifetime // UDP path lifetime probing; nil if disabled expired bool // whether the node has expired isWireguardOnly bool // whether the endpoint is WireGuard only } +func (de *endpoint) setBestAddrLocked(v addrQuality) { + if v.AddrPort != de.bestAddr.AddrPort { + de.probeUDPLifetime.resetCycleEndpointLocked() + } + de.bestAddr = v +} + +const ( + // udpLifetimeProbeCliffSlack is how much slack to use relative to a + // ProbeUDPLifetimeConfig.Cliffs duration in order to account for RTT, + // scheduling jitter, buffers, etc. If the cliff is 10s, we attempt to probe + // after 10s - 2s (8s) amount of inactivity. + udpLifetimeProbeCliffSlack = time.Second * 2 + // udpLifetimeProbeSchedulingTolerance is how much of a difference can be + // tolerated between a UDP lifetime probe scheduling target and when it + // actually fired. This must be some fraction of udpLifetimeProbeCliffSlack. + udpLifetimeProbeSchedulingTolerance = udpLifetimeProbeCliffSlack / 8 +) + +// probeUDPLifetime represents the configuration and state tied to probing UDP +// path lifetime. A probe "cycle" involves pinging the UDP path at various +// timeout cliffs, which are pre-defined durations of interest commonly used by +// NATs/firewalls as default stateful session timeout values. Cliffs are probed +// in ascending order. A "cycle" completes when all cliffs have received a pong, +// or when a ping times out. Cycles may extend across endpoint session lifetimes +// if they are disrupted by user traffic. +type probeUDPLifetime struct { + // All fields are guarded by endpoint.mu. probeUDPLifetime methods are for + // convenience. + + // config holds the probing configuration. + config ProbeUDPLifetimeConfig + + // timer is nil when idle. A non-nil timer indicates we intend to probe a + // timeout cliff in the future. + timer *time.Timer + + // bestAddr contains the endpoint.bestAddr.AddrPort at the time a cycle was + // scheduled to start. A probing cycle is 1:1 with the current + // endpoint.bestAddr.AddrPort in the interest of simplicity. When + // endpoint.bestAddr.AddrPort changes, any active probing cycle will reset. + bestAddr netip.AddrPort + // cycleStartedAt contains the time at which the first cliff + // (ProbeUDPLifetimeConfig.Cliffs[0]) was pinged for the current/last cycle. + cycleStartedAt time.Time + // cycleActive is true if a probing cycle is active, otherwise false. + cycleActive bool + // currentCliff represents the index into ProbeUDPLifetimeConfig.Cliffs for + // the cliff that we are waiting to ping, or waiting on a pong/timeout. + currentCliff int + // lastTxID is the ID for the last ping that was sent. + lastTxID stun.TxID +} + +func (p *probeUDPLifetime) currentCliffDurationEndpointLocked() time.Duration { + if p == nil { + return 0 + } + return p.config.Cliffs[p.currentCliff] +} + +// cycleCompleteMaxCliffEndpointLocked records the max cliff (as an index of +// ProbeUDPLifetimeConfig.Cliffs) a probing cycle reached, i.e. received a pong +// for. A value < 0 indicates no cliff was reached. It is a no-op if the active +// configuration does not equal defaultProbeUDPLifetimeConfig. +func (p *probeUDPLifetime) cycleCompleteMaxCliffEndpointLocked(cliff int) { + if !p.config.Equals(defaultProbeUDPLifetimeConfig) { + return + } + switch { + case cliff < 0: + metricUDPLifetimeCycleCompleteNoCliffReached.Add(1) + case cliff == 0: + metricUDPLifetimeCycleCompleteAt10sCliff.Add(1) + case cliff == 1: + metricUDPLifetimeCycleCompleteAt30sCliff.Add(1) + case cliff == 2: + metricUDPLifetimeCycleCompleteAt60sCliff.Add(1) + } +} + +// resetCycleEndpointLocked resets the state contained in p to reflect an +// inactive cycle. +func (p *probeUDPLifetime) resetCycleEndpointLocked() { + if p == nil { + return + } + if p.timer != nil { + p.timer.Stop() + p.timer = nil + } + p.cycleActive = false + p.currentCliff = 0 + p.bestAddr = netip.AddrPort{} +} + +// ProbeUDPLifetimeConfig represents the configuration for probing UDP path +// lifetime. +type ProbeUDPLifetimeConfig struct { + // The timeout cliffs to probe. Values are in ascending order. Ascending + // order is chosen over descending because we have limited opportunities to + // probe. With a descending order we are stuck waiting for a new UDP + // path/session if the first value times out. When that new path is + // established is anyone's guess. + Cliffs []time.Duration + // CycleCanStartEvery represents the min duration between cycles starting + // up. + CycleCanStartEvery time.Duration +} + +var ( + // defaultProbeUDPLifetimeConfig is the configuration that must be used + // for UDP path lifetime probing until it can be wholly disseminated (not + // just on/off) from upstream control components, and associated metrics + // (metricUDPLifetime*) have lifetime management. + // + // TODO(#10928): support dynamic config via tailcfg.PeerCapMap. + defaultProbeUDPLifetimeConfig = &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{ + time.Second * 10, + time.Second * 30, + time.Second * 60, + }, + CycleCanStartEvery: time.Hour * 24, + } +) + +// Equals returns true if b equals p, otherwise false. If both sides are nil, +// Equals returns true. If only one side is nil, Equals returns false. +func (p *ProbeUDPLifetimeConfig) Equals(b *ProbeUDPLifetimeConfig) bool { + if p == b { + return true + } + if (p == nil && b != nil) || (b == nil && p != nil) { + return false + } + if !slices.Equal(p.Cliffs, b.Cliffs) { + return false + } + if p.CycleCanStartEvery != b.CycleCanStartEvery { + return false + } + return true +} + +// Valid returns true if p is valid, otherwise false. p must be non-nil. +func (p *ProbeUDPLifetimeConfig) Valid() bool { + if len(p.Cliffs) < 1 { + // We need at least one cliff, otherwise there is nothing to probe. + return false + } + if p.CycleCanStartEvery < 1 { + // Probing must be constrained by a positive CycleCanStartEvery. + return false + } + for i, c := range p.Cliffs { + if c <= max(udpLifetimeProbeCliffSlack*2, heartbeatInterval) { + // A timeout cliff less than or equal to twice + // udpLifetimeProbeCliffSlack is invalid due to being effectively + // zero when the cliff slack is subtracted from the cliff value at + // scheduling time. + // + // A timeout cliff less or equal to the heartbeatInterval is also + // invalid, as we may attempt to schedule on the tail end of the + // last heartbeat tied to an active session. + // + // These values are constants, but max()'d in case they change in + // the future. + return false + } + if i == 0 { + continue + } + if c <= p.Cliffs[i-1] { + // Cliffs must be in ascending order. + return false + } + } + return true +} + +// setProbeUDPLifetimeOn enables or disables probing of UDP path lifetime based +// on v. In the case of enablement defaultProbeUDPLifetimeConfig is used as the +// desired configuration. +func (de *endpoint) setProbeUDPLifetimeOn(v bool) { + de.mu.Lock() + if v { + de.setProbeUDPLifetimeConfigLocked(defaultProbeUDPLifetimeConfig) + } else { + de.setProbeUDPLifetimeConfigLocked(nil) + } + de.mu.Unlock() +} + +// setProbeUDPLifetimeConfigLocked sets the desired configuration for probing +// UDP path lifetime. Ownership of desired is passed to endpoint, it must not be +// mutated once this call is made. A nil value disables the feature. If desired +// is non-nil but desired.Valid() returns false this is a no-op. +func (de *endpoint) setProbeUDPLifetimeConfigLocked(desired *ProbeUDPLifetimeConfig) { + if de.isWireguardOnly { + return + } + if desired == nil { + if de.probeUDPLifetime == nil { + // noop, not currently configured or desired + return + } + de.probeUDPLifetime.resetCycleEndpointLocked() + de.probeUDPLifetime = nil + return + } + if !desired.Valid() { + return + } + if de.probeUDPLifetime != nil { + if de.probeUDPLifetime.config.Equals(desired) { + // noop, current config equals desired + return + } + de.probeUDPLifetime.resetCycleEndpointLocked() + } else { + de.probeUDPLifetime = &probeUDPLifetime{} + } + p := de.probeUDPLifetime + p.config = *desired + p.resetCycleEndpointLocked() +} + // endpointDisco is the current disco key and short string for an endpoint. This // structure is immutable. type endpointDisco struct { @@ -219,7 +451,7 @@ func (de *endpoint) deleteEndpointLocked(why string, ep netip.AddrPort) { What: "deleteEndpointLocked-bestAddr-" + why, From: de.bestAddr, }) - de.bestAddr = addrQuality{} + de.setBestAddrLocked(addrQuality{}) } } @@ -236,9 +468,7 @@ func (de *endpoint) initFakeUDPAddr() { // noteRecvActivity records receive activity on de, and invokes // Conn.noteRecvActivity no more than once every 10s. -func (de *endpoint) noteRecvActivity(ipp netip.AddrPort) { - now := mono.Now() - +func (de *endpoint) noteRecvActivity(ipp netip.AddrPort, now mono.Time) { if de.isWireguardOnly { de.mu.Lock() de.bestAddr.AddrPort = ipp @@ -257,9 +487,9 @@ func (de *endpoint) noteRecvActivity(ipp netip.AddrPort) { de.mu.Unlock() } - elapsed := now.Sub(de.lastRecv.LoadAtomic()) + elapsed := now.Sub(de.lastRecvWG.LoadAtomic()) if elapsed > 10*time.Second { - de.lastRecv.StoreAtomic(now) + de.lastRecvWG.StoreAtomic(now) if de.c.noteRecvActivity == nil { return @@ -410,12 +640,140 @@ func (de *endpoint) addrForPingSizeLocked(now mono.Time, size int) (udpAddr, der return netip.AddrPort{}, de.derpAddr } +// maybeProbeUDPLifetimeLocked returns an afterInactivityFor duration and true +// if de is a candidate for UDP path lifetime probing in the future, otherwise +// false. +func (de *endpoint) maybeProbeUDPLifetimeLocked() (afterInactivityFor time.Duration, maybe bool) { + p := de.probeUDPLifetime + if p == nil { + return afterInactivityFor, false + } + if !de.bestAddr.IsValid() { + return afterInactivityFor, false + } + epDisco := de.disco.Load() + if epDisco == nil { + // peer does not support disco + return afterInactivityFor, false + } + // We compare disco keys, which may have a shorter lifetime than node keys + // since disco keys reset on startup. This has the desired side effect of + // shuffling probing probability where the local node ends up with a large + // key value lexicographically relative to the other nodes it tends to + // communicate with. If de's disco key changes, the cycle will reset. + if de.c.discoPublic.Compare(epDisco.key) >= 0 { + // lower disco pub key node probes higher + return afterInactivityFor, false + } + if !p.cycleActive && time.Since(p.cycleStartedAt) < p.config.CycleCanStartEvery { + // This is conservative as it doesn't account for afterInactivityFor use + // by the caller, potentially delaying the start of the next cycle. We + // assume the cycle could start immediately following + // maybeProbeUDPLifetimeLocked(), regardless of the value of + // afterInactivityFor relative to latest packets in/out time. + return afterInactivityFor, false + } + afterInactivityFor = p.currentCliffDurationEndpointLocked() - udpLifetimeProbeCliffSlack + if afterInactivityFor < 0 { + // shouldn't happen + return afterInactivityFor, false + } + return afterInactivityFor, true +} + +// heartbeatForLifetimeVia represents the scheduling source of +// endpoint.heartbeatForLifetime(). +type heartbeatForLifetimeVia string + +const ( + heartbeatForLifetimeViaSessionInactive heartbeatForLifetimeVia = "session-inactive" + heartbeatForLifetimeViaPongRx heartbeatForLifetimeVia = "pong-rx" + heartbeatForLifetimeViaSelf heartbeatForLifetimeVia = "self" +) + +// scheduleHeartbeatForLifetimeLocked schedules de.heartbeatForLifetime to fire +// in the future (after). The caller must describe themselves in the via arg. +func (de *endpoint) scheduleHeartbeatForLifetimeLocked(after time.Duration, via heartbeatForLifetimeVia) { + p := de.probeUDPLifetime + if p == nil { + return + } + de.c.dlogf("[v1] magicsock: disco: scheduling UDP lifetime probe for cliff=%v via=%v to %v (%v)", + p.currentCliffDurationEndpointLocked(), via, de.publicKey.ShortString(), de.discoShort()) + p.bestAddr = de.bestAddr.AddrPort + p.timer = time.AfterFunc(after, de.heartbeatForLifetime) + if via == heartbeatForLifetimeViaSelf { + metricUDPLifetimeCliffsRescheduled.Add(1) + } else { + metricUDPLifetimeCliffsScheduled.Add(1) + } +} + +// heartbeatForLifetime sends a disco ping recorded locally with a purpose of +// pingHeartbeatForUDPLifetime to de if de.bestAddr has remained stable, and it +// has been inactive for a duration that is within the error bounds for current +// lifetime probing cliff. Alternatively it may reschedule itself into the +// future, which is one of three scheduling sources. The other scheduling +// sources are de.heartbeat() and de.probeUDPLifetimeCliffDoneLocked(). +func (de *endpoint) heartbeatForLifetime() { + de.mu.Lock() + defer de.mu.Unlock() + p := de.probeUDPLifetime + if p == nil || p.timer == nil { + // We raced with a code path trying to p.timer.Stop() us. Give up early + // in the interest of simplicity. If p.timer.Stop() happened in + // de.heartbeat() presumably because of recent packets in/out we *could* + // still probe here, and it would be meaningful, but the time logic + // below would reschedule as-is. + return + } + p.timer = nil + if !p.bestAddr.IsValid() || de.bestAddr.AddrPort != p.bestAddr { + // best path changed + p.resetCycleEndpointLocked() + return + } + afterInactivityFor, ok := de.maybeProbeUDPLifetimeLocked() + if !ok { + p.resetCycleEndpointLocked() + return + } + inactiveFor := mono.Now().Sub(max(de.lastRecvUDPAny.LoadAtomic(), de.lastSendAny)) + delta := afterInactivityFor - inactiveFor + if delta.Abs() > udpLifetimeProbeSchedulingTolerance { + if delta < 0 { + // We missed our opportunity. We can resume this cliff at the tail + // end of another session. + metricUDPLifetimeCliffsMissed.Add(1) + return + } else { + // We need to wait longer before sending a ping. This can happen for + // a number of reasons, which are described in more detail in + // de.heartbeat(). + de.scheduleHeartbeatForLifetimeLocked(delta, heartbeatForLifetimeViaSelf) + return + } + } + if p.currentCliff == 0 { + p.cycleStartedAt = time.Now() + p.cycleActive = true + } + de.c.dlogf("[v1] magicsock: disco: sending disco ping for UDP lifetime probe cliff=%v to %v (%v)", + p.currentCliffDurationEndpointLocked(), de.publicKey.ShortString(), de.discoShort()) + de.startDiscoPingLocked(de.bestAddr.AddrPort, mono.Now(), pingHeartbeatForUDPLifetime, 0, nil) +} + // heartbeat is called every heartbeatInterval to keep the best UDP path alive, -// or kick off discovery of other paths. +// kick off discovery of other paths, or schedule the probing of UDP path +// lifetime on the tail end of an active session. func (de *endpoint) heartbeat() { de.mu.Lock() defer de.mu.Unlock() + if de.probeUDPLifetime != nil && de.probeUDPLifetime.timer != nil { + de.probeUDPLifetime.timer.Stop() + de.probeUDPLifetime.timer = nil + } de.heartBeatTimer = nil if de.heartbeatDisabled { @@ -423,18 +781,42 @@ func (de *endpoint) heartbeat() { return } - if de.lastSend.IsZero() { + if de.lastSendExt.IsZero() { // Shouldn't happen. return } - if mono.Since(de.lastSend) > sessionActiveTimeout { + now := mono.Now() + if now.Sub(de.lastSendExt) > sessionActiveTimeout { // Session's idle. Stop heartbeating. de.c.dlogf("[v1] magicsock: disco: ending heartbeats for idle session to %v (%v)", de.publicKey.ShortString(), de.discoShort()) + if afterInactivityFor, ok := de.maybeProbeUDPLifetimeLocked(); ok { + // This is the best place to best effort schedule a probe of UDP + // path lifetime in the future as it loosely translates to "UDP path + // is inactive". + // + // Note: wireguard-go schedules a WireGuard keepalive packet (by + // default, not tied to persistent keepalive feature) 10 seconds in + // the future after receiving an authenticated data packet. It's + // typically only sent by one side based on how the WireGuard state + // machine controls the timer. So, if we are on the receiving end of + // that keepalive, de.lastSendExt won't move, assuming there is no + // other user-generated traffic. This is one reason why we perform + // a more granular check of the last packets in/out time, below, as + // a WireGuard keepalive may have fallen somewhere within the + // sessionActiveTimeout window. heartbeatForLifetime will also + // perform a similar check, and reschedule as necessary. + inactiveFor := now.Sub(max(de.lastSendAny, de.lastRecvUDPAny.LoadAtomic())) + after := afterInactivityFor - inactiveFor + if after < 0 { + // shouldn't happen + return + } + de.scheduleHeartbeatForLifetimeLocked(after, heartbeatForLifetimeViaSessionInactive) + } return } - now := mono.Now() udpAddr, _, _ := de.addrForSendLocked(now) if udpAddr.IsValid() { // We have a preferred path. Ping that every 2 seconds. @@ -478,8 +860,8 @@ func (de *endpoint) wantFullPingLocked(now mono.Time) bool { return false } -func (de *endpoint) noteActiveLocked() { - de.lastSend = mono.Now() +func (de *endpoint) noteTxActivityExtTriggerLocked(now mono.Time) { + de.lastSendExt = now if de.heartBeatTimer == nil && !de.heartbeatDisabled { de.heartBeatTimer = time.AfterFunc(heartbeatInterval, de.heartbeat) } @@ -536,7 +918,6 @@ func (de *endpoint) discoPing(res *ipnstate.PingResult, size int, cb func(*ipnst de.startDiscoPingLocked(ep, now, pingCLI, size, resCB) } } - de.noteActiveLocked() } var ( @@ -562,7 +943,8 @@ func (de *endpoint) send(buffs [][]byte) error { } else if !udpAddr.IsValid() || now.After(de.trustBestAddrUntil) { de.sendDiscoPingsLocked(now, true) } - de.noteActiveLocked() + de.noteTxActivityExtTriggerLocked(now) + de.lastSendAny = now de.mu.Unlock() if !udpAddr.IsValid() && !derpAddr.IsValid() { @@ -606,6 +988,42 @@ func (de *endpoint) send(buffs [][]byte) error { return err } +// probeUDPLifetimeCliffDoneLocked is called when a disco +// pingHeartbeatForUDPLifetime is being cleaned up. result contains the reason +// for the cleanup, txid contains the ping's txid. +// probeUDPLifetimeCliffDoneLocked may schedule another +// pingHeartbeatForUDPLifetime in the future if there is another cliff remaining +// for the current probing cycle. +func (de *endpoint) probeUDPLifetimeCliffDoneLocked(result discoPingResult, txid stun.TxID) { + p := de.probeUDPLifetime + if p == nil || !p.cycleActive || de.probeUDPLifetime.timer != nil || txid != p.lastTxID { + // Probing may have been disabled while heartbeats were in flight. This + // can also be a duplicate or late arriving result. + return + } + metricUDPLifetimeCliffsCompleted.Add(1) + if result != discoPongReceived || p.currentCliff >= len(p.config.Cliffs)-1 { + maxCliffIndex := p.currentCliff + if result != discoPongReceived { + maxCliffIndex = p.currentCliff - 1 + } + var maxCliffDuration time.Duration + if maxCliffIndex >= 0 { + maxCliffDuration = p.config.Cliffs[maxCliffIndex] + } + p.cycleCompleteMaxCliffEndpointLocked(maxCliffIndex) + de.c.dlogf("[v1] magicsock: disco: UDP lifetime probe cycle completed max cliff=%v for %v (%v)", + maxCliffDuration, de.publicKey.ShortString(), de.discoShort()) + metricUDPLifetimeCyclesCompleted.Add(1) + p.resetCycleEndpointLocked() + } else { + p.currentCliff++ + if after, ok := de.maybeProbeUDPLifetimeLocked(); ok { + de.scheduleHeartbeatForLifetimeLocked(after, heartbeatForLifetimeViaPongRx) + } + } +} + func (de *endpoint) discoPingTimeout(txid stun.TxID) { de.mu.Lock() defer de.mu.Unlock() @@ -616,23 +1034,36 @@ func (de *endpoint) discoPingTimeout(txid stun.TxID) { if debugDisco() || !de.bestAddr.IsValid() || mono.Now().After(de.trustBestAddrUntil) { de.c.dlogf("[v1] magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], sp.to, de.publicKey.ShortString(), de.discoShort()) } - de.removeSentDiscoPingLocked(txid, sp) + de.removeSentDiscoPingLocked(txid, sp, discoPingTimedOut) } -// forgetDiscoPing is called by a timer when a ping either fails to send or -// has taken too long to get a pong reply. +// forgetDiscoPing is called when a ping fails to send. func (de *endpoint) forgetDiscoPing(txid stun.TxID) { de.mu.Lock() defer de.mu.Unlock() if sp, ok := de.sentPing[txid]; ok { - de.removeSentDiscoPingLocked(txid, sp) + de.removeSentDiscoPingLocked(txid, sp, discoPingFailed) } } -func (de *endpoint) removeSentDiscoPingLocked(txid stun.TxID, sp sentPing) { +// discoPingResult represents the result of an attempted disco ping send +// operation. +type discoPingResult int + +const ( + discoPingResultUnknown discoPingResult = iota + discoPingFailed + discoPingTimedOut + discoPongReceived +) + +func (de *endpoint) removeSentDiscoPingLocked(txid stun.TxID, sp sentPing, result discoPingResult) { // Stop the timer for the case where sendPing failed to write to UDP. // In the case of a timer already having fired, this is a no-op: sp.timer.Stop() + if sp.purpose == pingHeartbeatForUDPLifetime { + de.probeUDPLifetimeCliffDoneLocked(result, txid) + } delete(de.sentPing, txid) } @@ -685,6 +1116,11 @@ const ( // pingCLI means that the user is running "tailscale ping" // from the CLI. These types of pings can go over DERP. pingCLI + + // pingHeartbeatForUDPLifetime means that the purpose of a ping was to + // discover whether the UDP path was still active through any and all + // stateful middleboxes involved. + pingHeartbeatForUDPLifetime ) // startDiscoPingLocked sends a disco ping to ep in a separate goroutine. resCB, @@ -731,6 +1167,10 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo if purpose == pingHeartbeat { logLevel = discoVerboseLog } + if purpose == pingCLI { + de.noteTxActivityExtTriggerLocked(now) + } + de.lastSendAny = now for _, s := range sizes { txid := stun.NewTxID() de.sentPing[txid] = sentPing{ @@ -741,6 +1181,9 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo resCB: resCB, size: s, } + if purpose == pingHeartbeatForUDPLifetime && de.probeUDPLifetime != nil { + de.probeUDPLifetime.lastTxID = txid + } go de.sendDiscoPing(ep, epDisco.key, txid, s, logLevel) } @@ -864,7 +1307,7 @@ func (de *endpoint) setLastPing(ipp netip.AddrPort, now mono.Time) { // updateFromNode updates the endpoint based on a tailcfg.Node from a NetMap // update. -func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool) { +func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool, probeUDPLifetimeEnabled bool) { if !n.Valid() { panic("nil node when updating endpoint") } @@ -872,6 +1315,11 @@ func (de *endpoint) updateFromNode(n tailcfg.NodeView, heartbeatDisabled bool) { defer de.mu.Unlock() de.heartbeatDisabled = heartbeatDisabled + if probeUDPLifetimeEnabled { + de.setProbeUDPLifetimeConfigLocked(defaultProbeUDPLifetimeConfig) + } else { + de.setProbeUDPLifetimeConfigLocked(nil) + } de.expired = n.Expired() epDisco := de.disco.Load() @@ -1009,7 +1457,7 @@ func (de *endpoint) addCandidateEndpoint(ep netip.AddrPort, forRxPingTxID stun.T // // de.mu must be held. func (de *endpoint) clearBestAddrLocked() { - de.bestAddr = addrQuality{} + de.setBestAddrLocked(addrQuality{}) de.bestAddrAt = 0 de.trustBestAddrUntil = 0 } @@ -1093,7 +1541,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip return false } knownTxID = true // for naked returns below - de.removeSentDiscoPingLocked(m.TxID, sp) + de.removeSentDiscoPingLocked(m.TxID, sp, discoPongReceived) pktLen := int(pingSizeToPktLen(sp.size, sp.to.Addr().Is6())) if sp.size != 0 { @@ -1124,7 +1572,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip }) } - if sp.purpose != pingHeartbeat { + if sp.purpose != pingHeartbeat && sp.purpose != pingHeartbeatForUDPLifetime { de.c.dlogf("[v1] magicsock: disco: %v<-%v (%v, %v) got pong tx=%x latency=%v pktlen=%v pong.src=%v%v", de.c.discoShort, de.discoShort(), de.publicKey.ShortString(), src, m.TxID[:6], latency.Round(time.Millisecond), pktLen, m.Src, logger.ArgWriter(func(bw *bufio.Writer) { if sp.to != src { fmt.Fprintf(bw, " ping.to=%v", sp.to) @@ -1152,7 +1600,7 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip From: de.bestAddr, To: thisPong, }) - de.bestAddr = thisPong + de.setBestAddrLocked(thisPong) } if de.bestAddr.AddrPort == thisPong.AddrPort { de.debugUpdates.Add(EndpointChange{ @@ -1327,13 +1775,13 @@ func (de *endpoint) populatePeerStatus(ps *ipnstate.PeerStatus) { ps.Relay = de.c.derpRegionCodeOfIDLocked(int(de.derpAddr.Port())) - if de.lastSend.IsZero() { + if de.lastSendExt.IsZero() { return } now := mono.Now() - ps.LastWrite = de.lastSend.WallTime() - ps.Active = now.Sub(de.lastSend) < sessionActiveTimeout + ps.LastWrite = de.lastSendExt.WallTime() + ps.Active = now.Sub(de.lastSendExt) < sessionActiveTimeout if udpAddr, derpAddr, _ := de.addrForSendLocked(now); udpAddr.IsValid() && !derpAddr.IsValid() { ps.CurAddr = udpAddr.String() @@ -1372,7 +1820,7 @@ func (de *endpoint) stopAndReset() { // DERP-only endpoint. It does not stop the endpoint's heartbeat // timer, if one is running. func (de *endpoint) resetLocked() { - de.lastSend = 0 + de.lastSendExt = 0 de.lastFullPing = 0 de.clearBestAddrLocked() for _, es := range de.endpointState { @@ -1380,9 +1828,10 @@ func (de *endpoint) resetLocked() { } if !de.isWireguardOnly { for txid, sp := range de.sentPing { - de.removeSentDiscoPingLocked(txid, sp) + de.removeSentDiscoPingLocked(txid, sp, discoPingResultUnknown) } } + de.probeUDPLifetime.resetCycleEndpointLocked() } func (de *endpoint) numStopAndReset() int64 { diff --git a/wgengine/magicsock/endpoint_test.go b/wgengine/magicsock/endpoint_test.go new file mode 100644 index 000000000..1e2de8967 --- /dev/null +++ b/wgengine/magicsock/endpoint_test.go @@ -0,0 +1,326 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package magicsock + +import ( + "net/netip" + "testing" + "time" + + "github.com/dsnet/try" + "tailscale.com/types/key" +) + +func TestProbeUDPLifetimeConfig_Equals(t *testing.T) { + tests := []struct { + name string + a *ProbeUDPLifetimeConfig + b *ProbeUDPLifetimeConfig + want bool + }{ + { + "both sides nil", + nil, + nil, + true, + }, + { + "equal pointers", + defaultProbeUDPLifetimeConfig, + defaultProbeUDPLifetimeConfig, + true, + }, + { + "a nil", + nil, + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second}, + CycleCanStartEvery: time.Hour, + }, + false, + }, + { + "b nil", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second}, + CycleCanStartEvery: time.Hour, + }, + nil, + false, + }, + { + "Cliffs unequal", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second}, + CycleCanStartEvery: time.Hour, + }, + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second * 2}, + CycleCanStartEvery: time.Hour, + }, + false, + }, + { + "CycleCanStartEvery unequal", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second}, + CycleCanStartEvery: time.Hour, + }, + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second}, + CycleCanStartEvery: time.Hour * 2, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.a.Equals(tt.b); got != tt.want { + t.Errorf("Equals() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestProbeUDPLifetimeConfig_Valid(t *testing.T) { + tests := []struct { + name string + p *ProbeUDPLifetimeConfig + want bool + }{ + { + "default config valid", + defaultProbeUDPLifetimeConfig, + true, + }, + { + "no cliffs", + &ProbeUDPLifetimeConfig{ + CycleCanStartEvery: time.Hour, + }, + false, + }, + { + "zero CycleCanStartEvery", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second * 10}, + CycleCanStartEvery: 0, + }, + false, + }, + { + "cliff too small", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{min(udpLifetimeProbeCliffSlack*2, heartbeatInterval)}, + CycleCanStartEvery: time.Hour, + }, + false, + }, + { + "duplicate Cliffs values", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second * 2, time.Second * 2}, + CycleCanStartEvery: time.Hour, + }, + false, + }, + { + "Cliffs not ascending", + &ProbeUDPLifetimeConfig{ + Cliffs: []time.Duration{time.Second * 2, time.Second * 1}, + CycleCanStartEvery: time.Hour, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.p.Valid(); got != tt.want { + t.Errorf("Valid() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_endpoint_maybeProbeUDPLifetimeLocked(t *testing.T) { + var lower, higher key.DiscoPublic + a := key.NewDisco().Public() + b := key.NewDisco().Public() + if a.String() < b.String() { + lower = a + higher = b + } else { + lower = b + higher = a + } + addr := addrQuality{AddrPort: try.E1[netip.AddrPort](netip.ParseAddrPort("1.1.1.1:1"))} + newProbeUDPLifetime := func() *probeUDPLifetime { + return &probeUDPLifetime{ + config: *defaultProbeUDPLifetimeConfig, + } + } + + tests := []struct { + name string + localDisco key.DiscoPublic + remoteDisco *key.DiscoPublic + probeUDPLifetimeFn func() *probeUDPLifetime + bestAddr addrQuality + wantAfterInactivityForFn func(*probeUDPLifetime) time.Duration + wantMaybe bool + }{ + { + "nil probeUDPLifetime", + higher, + &lower, + func() *probeUDPLifetime { + return nil + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return 0 + }, + false, + }, + { + "local higher disco key", + higher, + &lower, + newProbeUDPLifetime, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return 0 + }, + false, + }, + { + "remote no disco key", + higher, + nil, + newProbeUDPLifetime, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return 0 + }, + false, + }, + { + "invalid bestAddr", + lower, + &higher, + newProbeUDPLifetime, + addrQuality{}, + func(lifetime *probeUDPLifetime) time.Duration { + return 0 + }, + false, + }, + { + "cycle started too recently", + lower, + &higher, + func() *probeUDPLifetime { + l := newProbeUDPLifetime() + l.cycleActive = false + l.cycleStartedAt = time.Now() + return l + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return 0 + }, + false, + }, + { + "maybe cliff 0 cycle not active", + lower, + &higher, + func() *probeUDPLifetime { + l := newProbeUDPLifetime() + l.cycleActive = false + l.cycleStartedAt = time.Now().Add(-l.config.CycleCanStartEvery).Add(-time.Second) + return l + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return lifetime.config.Cliffs[0] - udpLifetimeProbeCliffSlack + }, + true, + }, + { + "maybe cliff 0", + lower, + &higher, + func() *probeUDPLifetime { + l := newProbeUDPLifetime() + l.cycleActive = true + l.currentCliff = 0 + return l + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return lifetime.config.Cliffs[0] - udpLifetimeProbeCliffSlack + }, + true, + }, + { + "maybe cliff 1", + lower, + &higher, + func() *probeUDPLifetime { + l := newProbeUDPLifetime() + l.cycleActive = true + l.currentCliff = 1 + return l + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return lifetime.config.Cliffs[1] - udpLifetimeProbeCliffSlack + }, + true, + }, + { + "maybe cliff 2", + lower, + &higher, + func() *probeUDPLifetime { + l := newProbeUDPLifetime() + l.cycleActive = true + l.currentCliff = 2 + return l + }, + addr, + func(lifetime *probeUDPLifetime) time.Duration { + return lifetime.config.Cliffs[2] - udpLifetimeProbeCliffSlack + }, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + de := &endpoint{ + c: &Conn{ + discoPublic: tt.localDisco, + }, + bestAddr: tt.bestAddr, + } + if tt.remoteDisco != nil { + remote := &endpointDisco{ + key: *tt.remoteDisco, + } + de.disco.Store(remote) + } + p := tt.probeUDPLifetimeFn() + de.probeUDPLifetime = p + gotAfterInactivityFor, gotMaybe := de.maybeProbeUDPLifetimeLocked() + wantAfterInactivityFor := tt.wantAfterInactivityForFn(p) + if gotAfterInactivityFor != wantAfterInactivityFor { + t.Errorf("maybeProbeUDPLifetimeLocked() gotAfterInactivityFor = %v, want %v", gotAfterInactivityFor, wantAfterInactivityFor) + } + if gotMaybe != tt.wantMaybe { + t.Errorf("maybeProbeUDPLifetimeLocked() gotMaybe = %v, want %v", gotMaybe, tt.wantMaybe) + } + }) + } +} diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index edc812b48..e42688602 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -141,6 +141,8 @@ type Conn struct { silentDiscoOn atomic.Bool // whether silent disco is enabled + probeUDPLifetimeOn atomic.Bool // whether probing of UDP lifetime is enabled + // noV4Send is whether IPv4 UDP is known to be unable to transmit // at all. This could happen if the socket is in an invalid state // (as can happen on darwin after a network link status change). @@ -749,7 +751,7 @@ func (c *Conn) LastRecvActivityOfNodeKey(nk key.NodePublic) string { if !ok { return "never" } - saw := de.lastRecv.LoadAtomic() + saw := de.lastRecvWG.LoadAtomic() if saw == 0 { return "never" } @@ -1236,7 +1238,9 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache) cache.gen = de.numStopAndReset() ep = de } - ep.noteRecvActivity(ipp) + now := mono.Now() + ep.lastRecvUDPAny.StoreAtomic(now) + ep.noteRecvActivity(ipp, now) if stats := c.stats.Load(); stats != nil { stats.UpdateRxPhysical(ep.nodeAddr, ipp, len(b)) } @@ -1383,6 +1387,15 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke return } + isDERP := src.Addr() == tailcfg.DerpMagicIPAddr + if !isDERP { + // Record receive time for UDP transport packets. + pi, ok := c.peerMap.byIPPort[src] + if ok { + pi.ep.lastRecvUDPAny.StoreAtomic(mono.Now()) + } + } + // We're now reasonably sure we're expecting communication from // this peer, do the heavy crypto lifting to see what they want. // @@ -1430,7 +1443,6 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netip.AddrPort, derpNodeSrc ke return } - isDERP := src.Addr() == tailcfg.DerpMagicIPAddr if isDERP { metricRecvDiscoDERP.Add(1) } else { @@ -1817,11 +1829,13 @@ func debugRingBufferSize(numPeers int) int { // They might be set by envknob and/or controlknob. // The value is comparable. type debugFlags struct { - heartbeatDisabled bool + heartbeatDisabled bool + probeUDPLifetimeOn bool } func (c *Conn) debugFlagsLocked() (f debugFlags) { f.heartbeatDisabled = debugEnableSilentDisco() || c.silentDiscoOn.Load() + f.probeUDPLifetimeOn = c.probeUDPLifetimeOn.Load() return } @@ -1846,6 +1860,19 @@ func (c *Conn) SilentDisco() bool { return flags.heartbeatDisabled } +// SetProbeUDPLifetime toggles probing of UDP lifetime based on v. +func (c *Conn) SetProbeUDPLifetime(v bool) { + old := c.probeUDPLifetimeOn.Swap(v) + if old == v { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.peerMap.forEachEndpoint(func(ep *endpoint) { + ep.setProbeUDPLifetimeOn(v) + }) +} + // SetNetworkMap is called when the control client gets a new network // map from the control server. It must always be non-nil. // @@ -1876,7 +1903,8 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { if nodesEqual(priorPeers, curPeers) && c.lastFlags == flags { // The rest of this function is all adjusting state for peers that have // changed. But if the set of peers is equal and the debug flags (for - // silent disco) haven't changed, no need to do anything else. + // silent disco and probe UDP lifetime) haven't changed, there is no + // need to do anything else. return } @@ -1927,7 +1955,7 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { if epDisco := ep.disco.Load(); epDisco != nil { oldDiscoKey = epDisco.key } - ep.updateFromNode(n, flags.heartbeatDisabled) + ep.updateFromNode(n, flags.heartbeatDisabled, flags.probeUDPLifetimeOn) c.peerMap.upsertEndpoint(ep, oldDiscoKey) // maybe update discokey mappings in peerMap continue } @@ -1980,7 +2008,7 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { c.logEndpointCreated(n) } - ep.updateFromNode(n, flags.heartbeatDisabled) + ep.updateFromNode(n, flags.heartbeatDisabled, flags.probeUDPLifetimeOn) c.peerMap.upsertEndpoint(ep, key.DiscoPublic{}) } @@ -2947,8 +2975,34 @@ var ( // received an peer MTU probe response for a given MTU size. // TODO: add proper support for label maps in clientmetrics metricRecvDiscoPeerMTUProbesByMTU syncs.Map[string, *clientmetric.Metric] + + // metricUDPLifetime* metrics pertain to UDP lifetime probing, see type + // probeUDPLifetime. These metrics assume a static/default configuration for + // probing (defaultProbeUDPLifetimeConfig) until we disseminate + // ProbeUDPLifetimeConfig from control, and have lifetime management (GC old + // metrics) of clientmetrics or similar. + metricUDPLifetimeCliffsScheduled = newUDPLifetimeCounter("magicsock_udp_lifetime_cliffs_scheduled") + metricUDPLifetimeCliffsCompleted = newUDPLifetimeCounter("magicsock_udp_lifetime_cliffs_completed") + metricUDPLifetimeCliffsMissed = newUDPLifetimeCounter("magicsock_udp_lifetime_cliffs_missed") + metricUDPLifetimeCliffsRescheduled = newUDPLifetimeCounter("magicsock_udp_lifetime_cliffs_rescheduled") + metricUDPLifetimeCyclesCompleted = newUDPLifetimeCounter("magicsock_udp_lifetime_cycles_completed") + metricUDPLifetimeCycleCompleteNoCliffReached = newUDPLifetimeCounter("magicsock_udp_lifetime_cycle_complete_no_cliff_reached") + metricUDPLifetimeCycleCompleteAt10sCliff = newUDPLifetimeCounter("magicsock_udp_lifetime_cycle_complete_at_10s_cliff") + metricUDPLifetimeCycleCompleteAt30sCliff = newUDPLifetimeCounter("magicsock_udp_lifetime_cycle_complete_at_30s_cliff") + metricUDPLifetimeCycleCompleteAt60sCliff = newUDPLifetimeCounter("magicsock_udp_lifetime_cycle_complete_at_60s_cliff") ) +// newUDPLifetimeCounter returns a new *clientmetric.Metric with the provided +// name combined with a suffix representing defaultProbeUDPLifetimeConfig. +func newUDPLifetimeCounter(name string) *clientmetric.Metric { + var sb strings.Builder + for _, cliff := range defaultProbeUDPLifetimeConfig.Cliffs { + sb.WriteString(fmt.Sprintf("%ds", cliff/time.Second)) + } + sb.WriteString(fmt.Sprintf("_%ds", defaultProbeUDPLifetimeConfig.CycleCanStartEvery/time.Second)) + return clientmetric.NewCounter(fmt.Sprintf("%s_%s", name, sb.String())) +} + func getPeerMTUsProbedMetric(mtu tstun.WireMTU) *clientmetric.Metric { key := fmt.Sprintf("magicsock_recv_disco_peer_mtu_probes_by_mtu_%d", mtu) mm, _ := metricRecvDiscoPeerMTUProbesByMTU.LoadOrInit(key, func() *clientmetric.Metric { return clientmetric.NewCounter(key) }) diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index dc14c6094..640728d61 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -1220,15 +1220,15 @@ func Test32bitAlignment(t *testing.T) { }, } - if off := unsafe.Offsetof(de.lastRecv); off%8 != 0 { - t.Fatalf("endpoint.lastRecv is not 8-byte aligned") + if off := unsafe.Offsetof(de.lastRecvWG); off%8 != 0 { + t.Fatalf("endpoint.lastRecvWG is not 8-byte aligned") } - de.noteRecvActivity(netip.AddrPort{}) // verify this doesn't panic on 32-bit + de.noteRecvActivity(netip.AddrPort{}, mono.Now()) // verify this doesn't panic on 32-bit if called != 1 { t.Fatal("expected call to noteRecvActivity") } - de.noteRecvActivity(netip.AddrPort{}) + de.noteRecvActivity(netip.AddrPort{}, mono.Now()) if called != 1 { t.Error("expected no second call to noteRecvActivity") }