From a6270826a3f3c26ef913bd56e2e7dbe17be32c03 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 14 Oct 2023 13:16:04 -0700 Subject: [PATCH] wgengine/magicsock: fix data race regression in disco ping callbacks Regression from c15997511de3. The callback could be run multiple times from different endpoints. Fixes #9801 Signed-off-by: Brad Fitzpatrick --- wgengine/magicsock/endpoint.go | 52 ++++++++++++++++++++------------- wgengine/magicsock/magicsock.go | 2 +- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 1e344d156..88ab6d896 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -105,9 +105,8 @@ type sentPing struct { at mono.Time timer *time.Timer // timeout timer purpose discoPingPurpose - size int // size of the disco message - res *ipnstate.PingResult // nil unless CLI ping - cb func(*ipnstate.PingResult) // nil unless CLI ping + size int // size of the disco message + resCB *pingResultAndCallback // or nil for internal use } // endpointState is some state and history for a specific endpoint of @@ -431,7 +430,7 @@ func (de *endpoint) heartbeat() { udpAddr, _, _ := de.addrForSendLocked(now) if udpAddr.IsValid() { // We have a preferred path. Ping that every 2 seconds. - de.startDiscoPingLocked(udpAddr, now, pingHeartbeat, 0, nil, nil) + de.startDiscoPingLocked(udpAddr, now, pingHeartbeat, 0, nil) } if de.wantFullPingLocked(now) { @@ -475,9 +474,20 @@ func (de *endpoint) noteActiveLocked() { // can send - the maximum packet size minus the IPv4 and UDP headers. var MaxDiscoPingSize = tstun.MaxPacketSize - 20 - 8 -// cliPing starts a ping for the "tailscale ping" command. res is value to call cb with, -// already partially filled. -func (de *endpoint) cliPing(res *ipnstate.PingResult, size int, cb func(*ipnstate.PingResult)) { +type pingResultAndCallback struct { + taken atomic.Bool // first CompareAndSwamp from false to true takes ownership of res + res *ipnstate.PingResult + cb func(*ipnstate.PingResult) +} + +func (p *pingResultAndCallback) reply() bool { + return p != nil && p.taken.CompareAndSwap(false, true) +} + +// discoPing starts a disc-level ping for the "tailscale ping" command (or other +// callers, such as c2n). res is value to call cb with, already partially +// filled. cb must be called at most once. Once called, ownership of res passes to db. +func (de *endpoint) discoPing(res *ipnstate.PingResult, size int, cb func(*ipnstate.PingResult)) { de.mu.Lock() defer de.mu.Unlock() @@ -492,21 +502,23 @@ func (de *endpoint) cliPing(res *ipnstate.PingResult, size int, cb func(*ipnstat return } + resCB := &pingResultAndCallback{res: res, cb: cb} + now := mono.Now() udpAddr, derpAddr := de.addrForPingSizeLocked(now, size) if derpAddr.IsValid() { - de.startDiscoPingLocked(derpAddr, now, pingCLI, size, res, cb) + de.startDiscoPingLocked(derpAddr, now, pingCLI, size, resCB) } if udpAddr.IsValid() && now.Before(de.trustBestAddrUntil) { // Already have an active session, so just ping the address we're using. // Otherwise "tailscale ping" results to a node on the local network // can look like they're bouncing between, say 10.0.0.0/9 and the peer's // IPv6 address, both 1ms away, and it's random who replies first. - de.startDiscoPingLocked(udpAddr, now, pingCLI, size, res, cb) + de.startDiscoPingLocked(udpAddr, now, pingCLI, size, resCB) } else { for ep := range de.endpointState { - de.startDiscoPingLocked(ep, now, pingCLI, size, res, cb) + de.startDiscoPingLocked(ep, now, pingCLI, size, resCB) } } de.noteActiveLocked() @@ -660,10 +672,11 @@ const ( pingCLI ) -// startDiscoPingLocked sends a disco ping to ep in a separate -// goroutine. res and cb are for returning the results of CLI pings, -// otherwise they are nil. -func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpose discoPingPurpose, size int, res *ipnstate.PingResult, cb func(*ipnstate.PingResult)) { +// startDiscoPingLocked sends a disco ping to ep in a separate goroutine. resCB, +// if non-nil, means that a caller external to the magicsock package internals +// is interested in the result (such as a CLI "tailscale ping" or a c2n ping +// request, etc) +func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpose discoPingPurpose, size int, resCB *pingResultAndCallback) { if runtime.GOOS == "js" { return } @@ -710,8 +723,7 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo at: now, timer: time.AfterFunc(pingTimeoutDuration, func() { de.discoPingTimeout(txid) }), purpose: purpose, - res: res, - cb: cb, + resCB: resCB, size: s, } go de.sendDiscoPing(ep, epDisco.key, txid, s, logLevel) @@ -742,7 +754,7 @@ func (de *endpoint) sendDiscoPingsLocked(now mono.Time, sendCallMeMaybe bool) { de.c.dlogf("[v1] magicsock: disco: send, starting discovery for %v (%v)", de.publicKey.ShortString(), de.discoShort()) } - de.startDiscoPingLocked(ep, now, pingDiscovery, 0, nil, nil) + de.startDiscoPingLocked(ep, now, pingDiscovery, 0, nil) } derpAddr := de.derpAddr if sentAny && sendCallMeMaybe && derpAddr.IsValid() { @@ -1106,11 +1118,11 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip } // Currently only CLI ping uses this callback. - if sp.cb != nil { + if sp.resCB.reply() { if sp.purpose == pingCLI { - de.c.populateCLIPingResponseLocked(sp.res, latency, sp.to) + de.c.populateCLIPingResponseLocked(sp.resCB.res, latency, sp.to) } - go sp.cb(sp.res) + go sp.resCB.cb(sp.resCB.res) } // Promote this pong response to our current best address if it's lower latency. diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 335ce5656..0bc6a4113 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -757,7 +757,7 @@ func (c *Conn) Ping(peer tailcfg.NodeView, res *ipnstate.PingResult, size int, c cb(res) return } - ep.cliPing(res, size, cb) + ep.discoPing(res, size, cb) } // c.mu must be held