From 381de776c4878dd9af76b126cfa37bc80cad363f Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 2 Dec 2025 12:50:33 -0800 Subject: [PATCH] syncs: start working on mutex debugging, registration Updates #17852 Change-Id: Ib1b634eedd30cc4006bc1b39aa8d479d37c5f1f2 Signed-off-by: Brad Fitzpatrick --- control/controlbase/conn.go | 2 +- control/controlbase/handshake.go | 3 + control/controlclient/auto.go | 5 +- control/controlclient/direct.go | 1 + envknob/envknob.go | 5 + feature/relayserver/relayserver.go | 1 + health/health.go | 5 + ipn/ipnlocal/local.go | 3 + ipn/ipnlocal/node_backend.go | 2 + logtail/buffer.go | 4 +- net/dns/manager.go | 6 +- net/dns/resolver/forwarder.go | 1 + net/dns/resolver/tsdns.go | 1 + net/dnscache/dnscache.go | 14 +- net/netcheck/netcheck.go | 12 ++ net/netmon/interfaces_darwin.go | 5 + net/netmon/netmon.go | 1 + net/ping/ping.go | 4 +- net/portmapper/portmapper.go | 1 + net/tsdial/tsdial.go | 11 ++ syncs/mutex.go | 16 +- syncs/mutex_debug.go | 220 +++++++++++++++++++++++- util/eventbus/bus.go | 6 + util/eventbus/debug.go | 4 +- util/eventbus/publish.go | 6 +- util/eventbus/subscribe.go | 10 +- util/execqueue/execqueue.go | 15 ++ util/goroutines/tracker.go | 4 +- util/ringlog/ringlog.go | 4 +- util/syspolicy/rsop/change_callbacks.go | 3 +- util/syspolicy/rsop/resultant_policy.go | 2 + util/syspolicy/rsop/rsop.go | 5 + util/syspolicy/setting/setting.go | 5 + wgengine/magicsock/magicsock.go | 6 + wgengine/netlog/netlog.go | 4 + wgengine/netlog/netlog_omit.go | 1 + wgengine/userspace.go | 8 +- wgengine/watchdog.go | 11 +- 38 files changed, 389 insertions(+), 28 deletions(-) diff --git a/control/controlbase/conn.go b/control/controlbase/conn.go index 78ef73f71..8a3a90495 100644 --- a/control/controlbase/conn.go +++ b/control/controlbase/conn.go @@ -61,7 +61,7 @@ type rxState struct { // txState is all the Conn state that Write uses. type txState struct { - sync.Mutex + syncs.Mutex cipher cipher.AEAD nonce nonce err error // records the first partial write error for all future calls diff --git a/control/controlbase/handshake.go b/control/controlbase/handshake.go index 765a4620b..a57a3eca3 100644 --- a/control/controlbase/handshake.go +++ b/control/controlbase/handshake.go @@ -20,6 +20,7 @@ import ( chp "golang.org/x/crypto/chacha20poly1305" "golang.org/x/crypto/curve25519" "golang.org/x/crypto/hkdf" + "tailscale.com/syncs" "tailscale.com/types/key" ) @@ -186,6 +187,8 @@ func continueClientHandshake(ctx context.Context, conn net.Conn, s *symmetricSta cipher: c2, }, } + syncs.RegisterMutex(&c.rx.Mutex, "controlbase.Conn.rx.Mutex") + syncs.RegisterMutex(&c.tx.Mutex, "controlbase.Conn.tx.Mutex") return c, nil } diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index 336a8d491..235c5e03d 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -8,11 +8,11 @@ import ( "errors" "fmt" "net/http" - "sync" "sync/atomic" "time" "tailscale.com/net/sockstats" + "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/tstime" "tailscale.com/types/key" @@ -122,7 +122,7 @@ type Auto struct { observerQueue execqueue.ExecQueue shutdownFn func() // to be called prior to shutdown or nil - mu sync.Mutex // mutex guards the following fields + mu syncs.Mutex // mutex guards the following fields started bool // whether [Auto.Start] has been called wantLoggedIn bool // whether the user wants to be logged in per last method call @@ -194,6 +194,7 @@ func newNoStart(opts Options) (_ *Auto, err error) { observer: opts.Observer, shutdownFn: opts.Shutdown, } + syncs.RegisterMutex(&c.mu, "controlclient.Auto.mu") c.authCtx, c.authCancel = context.WithCancel(context.Background()) c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf) diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index d5cd6a13e..9680da0be 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -328,6 +328,7 @@ func NewDirect(opts Options) (*Direct, error) { dnsCache: dnsCache, dialPlan: opts.DialPlan, } + syncs.RegisterMutex(&c.mu, "controlclient.Direct.mu") c.discoPubKey = opts.DiscoPublicKey c.closedCtx, c.closeCtx = context.WithCancel(context.Background()) diff --git a/envknob/envknob.go b/envknob/envknob.go index 17a21387e..a2544584f 100644 --- a/envknob/envknob.go +++ b/envknob/envknob.go @@ -55,6 +55,11 @@ var ( regInt = map[string]*int{} ) +var _ = func() bool { + syncs.RegisterMutex(&mu, "envknob.mu") + return true +}() + func noteEnv(k, v string) { mu.Lock() defer mu.Unlock() diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index 4f23ae18e..531841408 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -74,6 +74,7 @@ func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, er }, logf: logger.WithPrefix(logf, featureName+": "), } + syncs.RegisterMutex(&e.mu, "relayserver.extension.mu") e.ec = sb.Sys().Bus.Get().Client("relayserver.extension") e.respPub = eventbus.Publish[magicsock.UDPRelayAllocResp](e.ec) eventbus.SubscribeFunc(e.ec, e.onDERPMapView) diff --git a/health/health.go b/health/health.go index f0f6a6ffb..0c7a57fdd 100644 --- a/health/health.go +++ b/health/health.go @@ -35,6 +35,11 @@ var ( debugHandler map[string]http.Handler ) +var _ = func() bool { + syncs.RegisterMutex(&mu, "health.mu") + return true +}() + // ReceiveFunc is one of the three magicsock Receive funcs (IPv4, IPv6, or // DERP). type ReceiveFunc int diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index fbf34aa42..a0fdfb845 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -515,6 +515,9 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running needsCaptiveDetection: make(chan bool), } + syncs.RegisterMutex(&b.mu, "ipnlocal.LocalBackend.mu") + syncs.RegisterMutex(&b.tkaSyncLock, "ipnlocal.LocalBackend.tkaSyncLock") + syncs.RegisterMutex(&b.lastNotifiedDriveSharesMu, "ipnlocal.LocalBackend.lastNotifiedDriveSharesMu") nb := newNodeBackend(ctx, b.logf, b.sys.Bus.Get()) b.currentNodeAtomic.Store(nb) diff --git a/ipn/ipnlocal/node_backend.go b/ipn/ipnlocal/node_backend.go index efef57ea4..f3407461c 100644 --- a/ipn/ipnlocal/node_backend.go +++ b/ipn/ipnlocal/node_backend.go @@ -117,6 +117,8 @@ func newNodeBackend(ctx context.Context, logf logger.Logf, bus *eventbus.Bus) *n eventClient: bus.Client("ipnlocal.nodeBackend"), readyCh: make(chan struct{}), } + syncs.RegisterMutex(&nb.mu, "ipnlocal.nodeBackend.mu") + // Default filter blocks everything and logs nothing. noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{}) nb.filterAtomic.Store(noneFilter) diff --git a/logtail/buffer.go b/logtail/buffer.go index 82c9b4610..aef508aad 100644 --- a/logtail/buffer.go +++ b/logtail/buffer.go @@ -28,9 +28,11 @@ type Buffer interface { } func NewMemoryBuffer(numEntries int) Buffer { - return &memBuffer{ + mb := &memBuffer{ pending: make(chan qentry, numEntries), } + syncs.RegisterMutex(&mb.dropMu, "logtail.memBuffer.dropMu") + return mb } type memBuffer struct { diff --git a/net/dns/manager.go b/net/dns/manager.go index de99fe646..5170d0b78 100644 --- a/net/dns/manager.go +++ b/net/dns/manager.go @@ -15,7 +15,6 @@ import ( "runtime" "slices" "strings" - "sync" "sync/atomic" "time" @@ -65,8 +64,8 @@ type Manager struct { knobs *controlknobs.Knobs // or nil goos string // if empty, gets set to runtime.GOOS - mu sync.Mutex // guards following - config *Config // Tracks the last viable DNS configuration set by Set. nil on failures other than compilation failures or if set has never been called. + mu syncs.Mutex // guards following + config *Config // Tracks the last viable DNS configuration set by Set. nil on failures other than compilation failures or if set has never been called. } // NewManagers created a new manager from the given config. @@ -95,6 +94,7 @@ func NewManager(logf logger.Logf, oscfg OSConfigurator, health *health.Tracker, knobs: knobs, goos: goos, } + syncs.RegisterMutex(&m.mu, "dns.Manager.mu") m.ctx, m.ctxCancel = context.WithCancel(context.Background()) m.logf("using %T", m.os) diff --git a/net/dns/resolver/forwarder.go b/net/dns/resolver/forwarder.go index 5adc43efc..83b28fdb4 100644 --- a/net/dns/resolver/forwarder.go +++ b/net/dns/resolver/forwarder.go @@ -267,6 +267,7 @@ func newForwarder(logf logger.Logf, netMon *netmon.Monitor, linkSel ForwardLinkS controlKnobs: knobs, verboseFwd: verboseDNSForward(), } + syncs.RegisterMutex(&f.mu, "resolver.forwarder.mu") f.ctx, f.ctxCancel = context.WithCancel(context.Background()) return f } diff --git a/net/dns/resolver/tsdns.go b/net/dns/resolver/tsdns.go index 3185cbe2b..25916c6c9 100644 --- a/net/dns/resolver/tsdns.go +++ b/net/dns/resolver/tsdns.go @@ -249,6 +249,7 @@ func New(logf logger.Logf, linkSel ForwardLinkSelector, dialer *tsdial.Dialer, h dialer: dialer, health: health, } + syncs.RegisterMutex(&r.mu, "resolver.Resolver.mu") r.forwarder = newForwarder(r.logf, netMon, linkSel, dialer, health, knobs) return r } diff --git a/net/dnscache/dnscache.go b/net/dnscache/dnscache.go index e222b983f..797d95cc7 100644 --- a/net/dnscache/dnscache.go +++ b/net/dnscache/dnscache.go @@ -98,10 +98,16 @@ type Resolver struct { sf singleflight.Group[string, ipRes] + registerMutexOnce sync.Once + mu syncs.Mutex ipCache map[string]ipCacheEntry } +func (r *Resolver) registerMutex() { + syncs.RegisterMutex(&r.mu, "dnscache.Resolver.mu") +} + // ipRes is the type used by the Resolver.sf singleflight group. type ipRes struct { ip, ip6 netip.Addr @@ -193,6 +199,8 @@ func SetDebugLoggingEnabled(v bool) { // If err is nil, ip will be non-nil. The v6 address may be nil even // with a nil error. func (r *Resolver) LookupIP(ctx context.Context, host string) (ip, v6 netip.Addr, allIPs []netip.Addr, err error) { + r.registerMutexOnce.Do(r.registerMutex) + if r.SingleHostStaticResult != nil { if r.SingleHost != host { return zaddr, zaddr, nil, fmt.Errorf("dnscache: unexpected hostname %q doesn't match expected %q", host, r.SingleHost) @@ -373,11 +381,13 @@ func (r *Resolver) addIPCache(host string, ip, ip6 netip.Addr, allIPs []netip.Ad // Dialer returns a wrapped DialContext func that uses the provided dnsCache. func Dialer(fwd netx.DialFunc, dnsCache *Resolver) netx.DialFunc { + dnsCache.registerMutexOnce.Do(dnsCache.registerMutex) d := &dialer{ fwd: fwd, dnsCache: dnsCache, pastConnect: map[netip.Addr]time.Time{}, } + syncs.RegisterMutex(&d.mu, "dnscache.dialer.mu") return d.DialContext } @@ -386,11 +396,12 @@ type dialer struct { fwd netx.DialFunc dnsCache *Resolver - mu sync.Mutex + mu syncs.Mutex pastConnect map[netip.Addr]time.Time } func (d *dialer) DialContext(ctx context.Context, network, address string) (retConn net.Conn, ret error) { + host, port, err := net.SplitHostPort(address) if err != nil { // Bogus. But just let the real dialer return an error rather than @@ -404,6 +415,7 @@ func (d *dialer) DialContext(ctx context.Context, network, address string) (retC host: host, port: port, } + syncs.RegisterMutex(&dc.mu, "dnscache.dialCall.mu") defer func() { // On failure, consider that our DNS might be wrong and ask the DNS fallback mechanism for // some other IPs to try. diff --git a/net/netcheck/netcheck.go b/net/netcheck/netcheck.go index c5a3d2392..8b22c459b 100644 --- a/net/netcheck/netcheck.go +++ b/net/netcheck/netcheck.go @@ -235,6 +235,8 @@ type Client struct { testEnoughRegions int testCaptivePortalDelay time.Duration + registerMutexOnce sync.Once + mu syncs.Mutex // guards following nextFull bool // do a full region scan, even if last != nil prev map[time.Time]*Report // some previous reports @@ -244,6 +246,10 @@ type Client struct { resolver *dnscache.Resolver // only set if UseDNSCache is true } +func (c *Client) registerMutex() { + syncs.RegisterMutex(&c.mu, "netcheck.Client.mu") +} + func (c *Client) enoughRegions() int { if c.testEnoughRegions > 0 { return c.testEnoughRegions @@ -281,6 +287,7 @@ func (c *Client) vlogf(format string, a ...any) { // MakeNextReportFull forces the next GetReport call to be a full // (non-incremental) probe of all DERP regions. func (c *Client) MakeNextReportFull() { + c.registerMutexOnce.Do(c.registerMutex) c.mu.Lock() defer c.mu.Unlock() c.nextFull = true @@ -291,6 +298,8 @@ func (c *Client) MakeNextReportFull() { // the loop started by Standalone, in normal operation in tailscaled incoming // STUN replies are routed to this method. func (c *Client) ReceiveSTUNPacket(pkt []byte, src netip.AddrPort) { + c.registerMutexOnce.Do(c.registerMutex) + c.vlogf("received STUN packet from %s", src) if src.Addr().Is4() { @@ -782,6 +791,7 @@ func (o *GetReportOpts) getLastDERPActivity(region int) time.Time { } func (c *Client) SetForcePreferredDERP(region int) { + c.registerMutexOnce.Do(c.registerMutex) c.mu.Lock() defer c.mu.Unlock() c.ForcePreferredDERP = region @@ -797,6 +807,7 @@ var hookStartCaptivePortalDetection feature.Hook[func(ctx context.Context, rs *r // // It may not be called concurrently with itself. func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetReportOpts) (_ *Report, reterr error) { + c.registerMutexOnce.Do(c.registerMutex) onlySTUN := false if opts != nil && opts.OnlySTUN { if opts.OnlyTCP443 { @@ -839,6 +850,7 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetRe inFlight: map[stun.TxID]func(netip.AddrPort){}, stopProbeCh: make(chan struct{}, 1), } + syncs.RegisterMutex(&rs.mu, "netcheck.reportState.mu") c.curState = rs last := c.last diff --git a/net/netmon/interfaces_darwin.go b/net/netmon/interfaces_darwin.go index 126040350..757d8697a 100644 --- a/net/netmon/interfaces_darwin.go +++ b/net/netmon/interfaces_darwin.go @@ -30,6 +30,11 @@ var ifNames struct { m map[int]string // ifindex => name } +var _ = func() bool { + syncs.RegisterMutex(&ifNames.Mutex, "netmon.ifNames.Mutex") + return true +}() + func init() { interfaceDebugExtras = interfaceDebugExtrasDarwin } diff --git a/net/netmon/netmon.go b/net/netmon/netmon.go index 657da04d5..a9b7f756a 100644 --- a/net/netmon/netmon.go +++ b/net/netmon/netmon.go @@ -125,6 +125,7 @@ func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) { stop: make(chan struct{}), lastWall: wallTime(), } + syncs.RegisterMutex(&m.mu, "netmon.Monitor.mu") m.changed = eventbus.Publish[ChangeDelta](m.b) st, err := m.interfaceStateUncached() if err != nil { diff --git a/net/ping/ping.go b/net/ping/ping.go index 8e16a692a..bf87d6f74 100644 --- a/net/ping/ping.go +++ b/net/ping/ping.go @@ -81,13 +81,15 @@ func New(ctx context.Context, logf logger.Logf, lp ListenPacketer) *Pinger { panic("net/ping: New:" + err.Error()) } - return &Pinger{ + p := &Pinger{ lp: lp, Logf: logf, timeNow: time.Now, id: binary.LittleEndian.Uint16(id[:]), pings: make(map[uint16]outstanding), } + syncs.RegisterMutex(&p.mu, "ping.Pinger.mu") + return p } func (p *Pinger) mkconn(ctx context.Context, typ, addr string) (net.PacketConn, error) { diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go index 16a981d1d..989a7207d 100644 --- a/net/portmapper/portmapper.go +++ b/net/portmapper/portmapper.go @@ -266,6 +266,7 @@ func NewClient(c Config) *Client { netMon: c.NetMon, onChange: c.OnChange, } + syncs.RegisterMutex(&ret.mu, "portmapper.Client.mu") if buildfeatures.HasPortMapper { // TODO(bradfitz): move this to method on netMon ret.ipAndGateway = netmon.LikelyHomeRouterIP diff --git a/net/tsdial/tsdial.go b/net/tsdial/tsdial.go index 065c01384..669055a49 100644 --- a/net/tsdial/tsdial.go +++ b/net/tsdial/tsdial.go @@ -75,6 +75,8 @@ type Dialer struct { // If nil, it's not used. NetstackDialUDP func(context.Context, netip.AddrPort) (net.Conn, error) + registerMutexOnce sync.Once + peerClientOnce sync.Once peerClient *http.Client @@ -142,6 +144,7 @@ func (d *Dialer) SetExitDNSDoH(doh string) { if !buildfeatures.HasUseExitNode { return } + d.registerMutexOnce.Do(d.registerMutex) d.mu.Lock() defer d.mu.Unlock() if d.exitDNSDoHBase == doh { @@ -193,9 +196,15 @@ func (d *Dialer) Close() error { return nil } +func (d *Dialer) registerMutex() { + syncs.RegisterMutex(&d.mu, "tsdial.Dialer.mu") +} + // SetNetMon sets d's network monitor to netMon. // It is a no-op to call SetNetMon with the same netMon as the current one. func (d *Dialer) SetNetMon(netMon *netmon.Monitor) { + d.registerMutexOnce.Do(d.registerMutex) + d.mu.Lock() defer d.mu.Unlock() if d.netMon == netMon { @@ -220,12 +229,14 @@ func (d *Dialer) SetNetMon(netMon *netmon.Monitor) { // NetMon returns the Dialer's network monitor. // It returns nil if SetNetMon has not been called. func (d *Dialer) NetMon() *netmon.Monitor { + d.registerMutexOnce.Do(d.registerMutex) d.mu.Lock() defer d.mu.Unlock() return d.netMon } func (d *Dialer) SetBus(bus *eventbus.Bus) { + d.registerMutexOnce.Do(d.registerMutex) d.mu.Lock() defer d.mu.Unlock() if d.bus == bus { diff --git a/syncs/mutex.go b/syncs/mutex.go index 8034e1712..78342ffc9 100644 --- a/syncs/mutex.go +++ b/syncs/mutex.go @@ -7,6 +7,10 @@ package syncs import "sync" +// MutexDebugging indicates whether the "ts_mutex_debug" build tag is set +// and mutex debugging is enabled. +const MutexDebugging = false + // Mutex is an alias for sync.Mutex. // // It's only not a sync.Mutex when built with the ts_mutex_debug build tag. @@ -20,4 +24,14 @@ type RWMutex = sync.RWMutex // RequiresMutex declares the caller assumes it has the given // mutex held. In non-debug builds, it's a no-op and compiles to // nothing. -func RequiresMutex(mu *sync.Mutex) {} +func RequiresMutex(mu *Mutex) {} + +func RegisterMutex(mu *Mutex, name string) {} + +// ForkJoinGo is like go fn() but indicates that the goroutine +// is part of a fork-join parallelism pattern. +// +// This compiles to just "go fn()" in non-debug builds. +func ForkJoinGo(fn func()) { + go fn() +} diff --git a/syncs/mutex_debug.go b/syncs/mutex_debug.go index 55a9b1231..7a532a216 100644 --- a/syncs/mutex_debug.go +++ b/syncs/mutex_debug.go @@ -5,7 +5,20 @@ package syncs -import "sync" +import ( + "bytes" + "fmt" + "log" + "runtime" + "sync" + "unsafe" + + "go4.org/mem" +) + +// MutexDebugging indicates whether the "ts_mutex_debug" build tag is set +// and mutex debugging is enabled. +const MutexDebugging = true type Mutex struct { sync.Mutex @@ -15,8 +28,211 @@ type RWMutex struct { sync.RWMutex } -func RequiresMutex(mu *sync.Mutex) { +func RequiresMutex(mu *Mutex) { // TODO: check } // TODO(bradfitz): actually track stuff when in debug mode. + +var bufPool = &sync.Pool{ + New: func() any { + b := make([]byte, 16<<10) + return &b + }, +} + +func (m *Mutex) Lock() { + defer m.Mutex.Lock() + + gid := curGoroutineID() + + up := uintptr((unsafe.Pointer)(m)) + + bufp := bufPool.Get().(*[]byte) + defer bufPool.Put(bufp) + stack := (*bufp)[:runtime.Stack(*bufp, false)] + + trackMu.Lock() + defer trackMu.Unlock() + gid = walkToParent(gid) + + name, ok := mutexName[up] + if !ok { + name = "unnamed" + log.Printf("XXX unregistered Mutex.Lock %p called from:\n%s", m, stack) + } + + switch name { + case "ipnlocal.LocalBackend.mu", "wgengine.userspaceEngine.wgLock", "ipnlocal.nodeBackend.mu": + if bytes.Contains(stack, []byte("wireguard-go/device.(*Device).RoutineReceiveIncoming")) { + log.Printf("XXX mutex Lock from wireguard land: %s, %s", name, stack) + } + } + + gi, ok := goroutines[gid] + if !ok { + gi = &goroutineInfo{} + goroutines[gid] = gi + } + gi.holding = append(gi.holding, &heldLock{ + mutexAddr: up, + name: name, + }) + if len(gi.holding) > 1 { + names := make([]string, 0, len(gi.holding)) + for i, hl := range gi.holding { + names = append(names, hl.name) + + if i == 0 { + continue + } + lo := lockOrder{ + first: gi.holding[i-1].name, + second: hl.name, + } + if lockOrders[lo.reverse()] { + log.Printf("mutex: potential deadlock detected: lock order violation: %q then %q (saw reverse order before); goroutine %d stack:\n%s", lo.first, lo.second, gid, stack) + } else { + if _, ok := lockOrders[lo]; !ok { + log.Printf("XXX learned new lock order: %q then %q", lo.first, lo.second) + lockOrders[lo] = true + } + } + } + log.Printf("XXX goroutine %v holding %q", gid, names) + } +} + +func (m *Mutex) Unlock() { + defer m.Mutex.Unlock() + up := uintptr((unsafe.Pointer)(m)) + + gid := curGoroutineID() + trackMu.Lock() + defer trackMu.Unlock() + gid = walkToParent(gid) + + name, ok := mutexName[up] + if !ok { + name = "unnamed" + } + + gi, ok := goroutines[gid] + if !ok || len(gi.holding) == 0 { + log.Printf("mutex: unlock of %p (%s) by goroutine %d with no held locks", m, name, gid) + return + } + last := gi.holding[len(gi.holding)-1] + if last.mutexAddr != up { + log.Printf("mutex: unlock of %p (%s) by goroutine %d, but last held lock is %p (%s)", m, name, gid, last.mutexAddr, last.name) + return + } + gi.holding[len(gi.holding)-1] = nil + gi.holding = gi.holding[:len(gi.holding)-1] + if len(gi.holding) == 0 { + delete(goroutines, gid) + } +} + +var ( + trackMu sync.Mutex + mutexName = make(map[uintptr]string) + goroutines = make(map[uint64]*goroutineInfo) + parentGID = make(map[uint64]uint64) // child goroutine ID -> parent (for ForkJoinGo) + lockOrders = make(map[lockOrder]bool) // observed lock orderings +) + +type lockOrder struct { + first string + second string +} + +func (lo lockOrder) reverse() lockOrder { + return lockOrder{first: lo.second, second: lo.first} +} + +type goroutineInfo struct { + holding []*heldLock +} + +type heldLock struct { + mutexAddr uintptr + name string + // TODO: stack? [16]uintptr? +} + +// RegisterMutex registers the given mutex with the given name for +// debugging purposes. +func RegisterMutex(mu *Mutex, name string) { + trackMu.Lock() + defer trackMu.Unlock() + up := uintptr((unsafe.Pointer)(mu)) + mutexName[up] = name + runtime.AddCleanup(mu, func(up uintptr) { + trackMu.Lock() + defer trackMu.Unlock() + delete(mutexName, up) + }, up) +} + +var littleBuf = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 64) + return &buf + }, +} + +var goroutineSpace = []byte("goroutine ") + +func curGoroutineID() uint64 { + bp := littleBuf.Get().(*[]byte) + defer littleBuf.Put(bp) + b := *bp + b = b[:runtime.Stack(b, false)] + // Parse the 4707 out of "goroutine 4707 [" + b = bytes.TrimPrefix(b, goroutineSpace) + i := bytes.IndexByte(b, ' ') + if i < 0 { + panic(fmt.Sprintf("No space found in %q", b)) + } + b = b[:i] + n, err := mem.ParseUint(mem.B(b), 10, 64) + if err != nil { + panic(fmt.Sprintf("Failed to parse goroutine ID out of %q: %v", b, err)) + } + return n +} + +func trackForkJoinPair(parent, child uint64, add bool) { + trackMu.Lock() + defer trackMu.Unlock() + if add { + parentGID[child] = parent + } else { + delete(parentGID, child) + } +} + +func walkToParent(gid uint64) uint64 { + for { + p, ok := parentGID[gid] + if !ok { + return gid + } + gid = p + } +} + +// ForkJoinGo is like go fn() but indicates that the goroutine +// is part of a fork-join parallelism pattern. +// +// This compiles to just "go fn()" in non-debug builds. +func ForkJoinGo(fn func()) { + parentGID := curGoroutineID() + go func() { + childGID := curGoroutineID() + trackForkJoinPair(parentGID, childGID, true) + defer trackForkJoinPair(parentGID, childGID, false) + fn() + }() +} diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 880e075cc..0a4674081 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus { clients: set.Set[*Client]{}, logf: opts.logger(), } + + syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu") + syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu") + ret.router = runWorker(ret.pump) return ret } @@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client { bus: b, pub: set.Set[publisher]{}, } + syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu") + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu") b.clientsMu.Lock() defer b.clientsMu.Unlock() b.clients.Add(ret) diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 0453defb1..2f2c9589a 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -11,10 +11,10 @@ import ( "runtime" "slices" "strings" + "sync" "sync/atomic" "time" - "tailscale.com/syncs" "tailscale.com/types/logger" ) @@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type { // A hook collects hook functions that can be run as a group. type hook[T any] struct { - syncs.Mutex + sync.Mutex fns []hookFn[T] } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 348bb9dff..b35af28fd 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,6 +5,8 @@ package eventbus import ( "reflect" + + "tailscale.com/syncs" ) // publisher is a uniformly typed wrapper around Publisher[T], so that @@ -21,7 +23,9 @@ type Publisher[T any] struct { } func newPublisher[T any](c *Client) *Publisher[T] { - return &Publisher[T]{client: c} + p := &Publisher[T]{client: c} + syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu") + return p } // Close closes the publisher. diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index b0348e125..24886ee08 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState { snapshot: make(chan chan []DeliveredEvent), outputs: map[reflect.Type]subscriber{}, } + syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu") ret.dispatcher = runWorker(ret.pump) return ret } @@ -194,18 +195,21 @@ type Subscriber[T any] struct { func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &Subscriber[T]{ + s := &Subscriber[T]{ read: make(chan T), unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { ret := &Subscriber[T]{ read: make(chan T, 100), // arbitrary, large } + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu") ret.unregister = attach(ret.monitor) return ret } @@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct { func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &SubscriberFunc[T]{ + s := &SubscriberFunc[T]{ read: f, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } // Close closes the SubscriberFunc, indicating the caller no longer wishes to diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 87616a6b5..acf25f645 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -7,11 +7,14 @@ package execqueue import ( "context" "errors" + "sync" "tailscale.com/syncs" ) type ExecQueue struct { + regMutexOnce sync.Once + mu syncs.Mutex ctx context.Context // context.Background + closed on Shutdown cancel context.CancelFunc // closes ctx @@ -21,7 +24,13 @@ type ExecQueue struct { queue []func() } +func (q *ExecQueue) registerMutex() { + syncs.RegisterMutex(&q.mu, "execqueue.ExecQueue.mu") +} + func (q *ExecQueue) Add(f func()) { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() defer q.mu.Unlock() if q.closed { @@ -39,6 +48,8 @@ func (q *ExecQueue) Add(f func()) { // RunSync waits for the queue to be drained and then synchronously runs f. // It returns an error if the queue is closed before f is run or ctx expires. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() q.initCtxLocked() shutdownCtx := q.ctx @@ -80,6 +91,8 @@ func (q *ExecQueue) run(f func()) { // Shutdown asynchronously signals the queue to stop. func (q *ExecQueue) Shutdown() { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() defer q.mu.Unlock() q.closed = true @@ -98,6 +111,8 @@ var errExecQueueShutdown = errors.New("execqueue shut down") // Wait waits for the queue to be empty or shut down. func (q *ExecQueue) Wait(ctx context.Context) error { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() q.initCtxLocked() waitCh := q.doneWaiter diff --git a/util/goroutines/tracker.go b/util/goroutines/tracker.go index c2a0cb8c3..044843d33 100644 --- a/util/goroutines/tracker.go +++ b/util/goroutines/tracker.go @@ -4,9 +4,9 @@ package goroutines import ( + "sync" "sync/atomic" - "tailscale.com/syncs" "tailscale.com/util/set" ) @@ -15,7 +15,7 @@ type Tracker struct { started atomic.Int64 // counter running atomic.Int64 // gauge - mu syncs.Mutex + mu sync.Mutex onDone set.HandleSet[func()] } diff --git a/util/ringlog/ringlog.go b/util/ringlog/ringlog.go index 62dfbae5b..781e8f5ca 100644 --- a/util/ringlog/ringlog.go +++ b/util/ringlog/ringlog.go @@ -8,9 +8,11 @@ import "tailscale.com/syncs" // New creates a new [RingLog] containing at most max items. func New[T any](max int) *RingLog[T] { - return &RingLog[T]{ + rl := &RingLog[T]{ max: max, } + syncs.RegisterMutex(&rl.mu, "ringlog.RingLog.mu") + return rl } // RingLog is a concurrency-safe fixed size log window containing entries of [T]. diff --git a/util/syspolicy/rsop/change_callbacks.go b/util/syspolicy/rsop/change_callbacks.go index 71135bb2a..fdf51c253 100644 --- a/util/syspolicy/rsop/change_callbacks.go +++ b/util/syspolicy/rsop/change_callbacks.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "tailscale.com/syncs" "tailscale.com/util/set" "tailscale.com/util/syspolicy/internal/loggerx" "tailscale.com/util/syspolicy/pkey" @@ -71,7 +70,7 @@ func (c PolicyChange) HasChangedAnyOf(keys ...pkey.Key) bool { // policyChangeCallbacks are the callbacks to invoke when the effective policy changes. // It is safe for concurrent use. type policyChangeCallbacks struct { - mu syncs.Mutex + mu sync.Mutex cbs set.HandleSet[PolicyChangeCallback] } diff --git a/util/syspolicy/rsop/resultant_policy.go b/util/syspolicy/rsop/resultant_policy.go index bdda90976..67e13ab56 100644 --- a/util/syspolicy/rsop/resultant_policy.go +++ b/util/syspolicy/rsop/resultant_policy.go @@ -96,6 +96,8 @@ func newPolicy(scope setting.PolicyScope, sources ...*source.Source) (_ *Policy, closeCh: make(chan struct{}), doneCh: make(chan struct{}), } + syncs.RegisterMutex(&p.mu, "syspolicy/rsop.Policy.mu") + if _, err := p.reloadNow(false); err != nil { p.Close() return nil, err diff --git a/util/syspolicy/rsop/rsop.go b/util/syspolicy/rsop/rsop.go index 333dca643..d7e50a004 100644 --- a/util/syspolicy/rsop/rsop.go +++ b/util/syspolicy/rsop/rsop.go @@ -32,6 +32,11 @@ var ( effectivePolicyLRU [setting.NumScopes]syncs.AtomicValue[*Policy] ) +var _ = func() bool { + syncs.RegisterMutex(&policyMu, "syspolicy/rsop.policyMu") + return true +}() + // PolicyFor returns the [Policy] for the specified scope, // creating it from the registered [source.Store]s if it doesn't already exist. func PolicyFor(scope setting.PolicyScope) (*Policy, error) { diff --git a/util/syspolicy/setting/setting.go b/util/syspolicy/setting/setting.go index 97362b1dc..d0df2436c 100644 --- a/util/syspolicy/setting/setting.go +++ b/util/syspolicy/setting/setting.go @@ -220,6 +220,11 @@ var ( definitionsUsed bool ) +var _ = func() bool { + syncs.RegisterMutex(&definitionsMu, "syspolicy/setting.definitionsMu") + return true +}() + // Register registers a policy setting with the specified key, scope, value type, // and an optional list of supported platforms. All policy settings must be // registered before any of them can be used. Register panics if called after diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 064838a2d..e4d1d3fbc 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -599,6 +599,11 @@ func newConn(logf logger.Logf) *Conn { discoInfo: make(map[key.DiscoPublic]*discoInfo), cloudInfo: newCloudInfo(logf), } + syncs.RegisterMutex(&c.mu, "magicsock.Conn.mu") + syncs.RegisterMutex(&c.pconn4.mu, "magicsock.Conn.pconn4.mu") + syncs.RegisterMutex(&c.pconn6.mu, "magicsock.Conn.pconn6.mu") + syncs.RegisterMutex(&c.endpointTracker.mu, "magicsock.Conn.endpointTracker.mu") + c.discoAtomic.Set(discoPrivate) c.bind = &connBind{Conn: c, closed: true} c.receiveBatchPool = sync.Pool{New: func() any { @@ -3145,6 +3150,7 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) { heartbeatDisabled: flags.heartbeatDisabled, isWireguardOnly: n.IsWireGuardOnly(), } + syncs.RegisterMutex(&ep.mu, "magicsock.endpoint.mu") switch runtime.GOOS { case "ios", "android": // Omit, to save memory. Prior to 2024-03-20 we used to limit it to diff --git a/wgengine/netlog/netlog.go b/wgengine/netlog/netlog.go index 12fe9c797..ba643944f 100644 --- a/wgengine/netlog/netlog.go +++ b/wgengine/netlog/netlog.go @@ -80,6 +80,10 @@ type Logger struct { routePrefixes []netip.Prefix } +func (nl *Logger) RegisterMutex() { + syncs.RegisterMutex(&nl.mu, "netlog.Logger.mu") +} + // Running reports whether the logger is running. func (nl *Logger) Running() bool { nl.mu.Lock() diff --git a/wgengine/netlog/netlog_omit.go b/wgengine/netlog/netlog_omit.go index 03610a1ef..3cefe978a 100644 --- a/wgengine/netlog/netlog_omit.go +++ b/wgengine/netlog/netlog_omit.go @@ -12,3 +12,4 @@ func (*Logger) Running() bool { return false } func (*Logger) Shutdown(any) error { return nil } func (*Logger) ReconfigNetworkMap(any) {} func (*Logger) ReconfigRoutes(any) {} +func (*Logger) RegisterMutex() {} diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 1b8562d3f..7e48e3967 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -17,7 +17,6 @@ import ( "runtime" "slices" "strings" - "sync" "time" "github.com/tailscale/wireguard-go/device" @@ -130,7 +129,7 @@ type userspaceEngine struct { // is being routed over Tailscale. isDNSIPOverTailscale syncs.AtomicValue[func(netip.Addr) bool] - wgLock sync.Mutex // serializes all wgdev operations; see lock order comment below + wgLock syncs.Mutex // serializes all wgdev operations; see lock order comment below lastCfgFull wgcfg.Config lastNMinPeers int lastRouter *router.Config @@ -145,7 +144,7 @@ type userspaceEngine struct { lastStatusPollTime mono.Time // last time we polled the engine status reconfigureVPN func() error // or nil - mu sync.Mutex // guards following; see lock order comment below + mu syncs.Mutex // guards following; see lock order comment below netMap *netmap.NetworkMap // or nil closing bool // Close was called (even if we're still closing) statusCallback StatusCallback @@ -361,6 +360,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) reconfigureVPN: conf.ReconfigureVPN, health: conf.HealthTracker, } + syncs.RegisterMutex(&e.mu, "wgengine.userspaceEngine.mu") + syncs.RegisterMutex(&e.wgLock, "wgengine.userspaceEngine.wgLock") + e.networkLogger.RegisterMutex() if e.birdClient != nil { // Disable the protocol at start time. diff --git a/wgengine/watchdog.go b/wgengine/watchdog.go index 9cc4ed3b5..bad34a15f 100644 --- a/wgengine/watchdog.go +++ b/wgengine/watchdog.go @@ -19,6 +19,7 @@ import ( "tailscale.com/ipn/ipnstate" "tailscale.com/net/dns" "tailscale.com/net/packet" + "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/types/key" "tailscale.com/types/netmap" @@ -81,9 +82,13 @@ func (e *watchdogEngine) watchdogErr(name string, fn func() error) error { }() errCh := make(chan error) - go func() { - errCh <- fn() - }() + if syncs.MutexDebugging { + syncs.ForkJoinGo(func() { errCh <- fn() }) + } else { + // Don't use ForkJoinGo to avoid the loss of "created by" in + // stack traces. + go func() { errCh <- fn() }() + } t := time.NewTimer(e.maxWait) select { case err := <-errCh: