syncs: start working on mutex debugging, registration

Updates #17852

Change-Id: Ib1b634eedd30cc4006bc1b39aa8d479d37c5f1f2
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
bradfitz/mutex_debug
Brad Fitzpatrick 22 hours ago
parent b8c58ca7c1
commit 381de776c4

@ -61,7 +61,7 @@ type rxState struct {
// txState is all the Conn state that Write uses.
type txState struct {
sync.Mutex
syncs.Mutex
cipher cipher.AEAD
nonce nonce
err error // records the first partial write error for all future calls

@ -20,6 +20,7 @@ import (
chp "golang.org/x/crypto/chacha20poly1305"
"golang.org/x/crypto/curve25519"
"golang.org/x/crypto/hkdf"
"tailscale.com/syncs"
"tailscale.com/types/key"
)
@ -186,6 +187,8 @@ func continueClientHandshake(ctx context.Context, conn net.Conn, s *symmetricSta
cipher: c2,
},
}
syncs.RegisterMutex(&c.rx.Mutex, "controlbase.Conn.rx.Mutex")
syncs.RegisterMutex(&c.tx.Mutex, "controlbase.Conn.tx.Mutex")
return c, nil
}

@ -8,11 +8,11 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"tailscale.com/net/sockstats"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/types/key"
@ -122,7 +122,7 @@ type Auto struct {
observerQueue execqueue.ExecQueue
shutdownFn func() // to be called prior to shutdown or nil
mu sync.Mutex // mutex guards the following fields
mu syncs.Mutex // mutex guards the following fields
started bool // whether [Auto.Start] has been called
wantLoggedIn bool // whether the user wants to be logged in per last method call
@ -194,6 +194,7 @@ func newNoStart(opts Options) (_ *Auto, err error) {
observer: opts.Observer,
shutdownFn: opts.Shutdown,
}
syncs.RegisterMutex(&c.mu, "controlclient.Auto.mu")
c.authCtx, c.authCancel = context.WithCancel(context.Background())
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)

@ -328,6 +328,7 @@ func NewDirect(opts Options) (*Direct, error) {
dnsCache: dnsCache,
dialPlan: opts.DialPlan,
}
syncs.RegisterMutex(&c.mu, "controlclient.Direct.mu")
c.discoPubKey = opts.DiscoPublicKey
c.closedCtx, c.closeCtx = context.WithCancel(context.Background())

@ -55,6 +55,11 @@ var (
regInt = map[string]*int{}
)
var _ = func() bool {
syncs.RegisterMutex(&mu, "envknob.mu")
return true
}()
func noteEnv(k, v string) {
mu.Lock()
defer mu.Unlock()

@ -74,6 +74,7 @@ func newExtension(logf logger.Logf, sb ipnext.SafeBackend) (ipnext.Extension, er
},
logf: logger.WithPrefix(logf, featureName+": "),
}
syncs.RegisterMutex(&e.mu, "relayserver.extension.mu")
e.ec = sb.Sys().Bus.Get().Client("relayserver.extension")
e.respPub = eventbus.Publish[magicsock.UDPRelayAllocResp](e.ec)
eventbus.SubscribeFunc(e.ec, e.onDERPMapView)

@ -35,6 +35,11 @@ var (
debugHandler map[string]http.Handler
)
var _ = func() bool {
syncs.RegisterMutex(&mu, "health.mu")
return true
}()
// ReceiveFunc is one of the three magicsock Receive funcs (IPv4, IPv6, or
// DERP).
type ReceiveFunc int

@ -515,6 +515,9 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool),
}
syncs.RegisterMutex(&b.mu, "ipnlocal.LocalBackend.mu")
syncs.RegisterMutex(&b.tkaSyncLock, "ipnlocal.LocalBackend.tkaSyncLock")
syncs.RegisterMutex(&b.lastNotifiedDriveSharesMu, "ipnlocal.LocalBackend.lastNotifiedDriveSharesMu")
nb := newNodeBackend(ctx, b.logf, b.sys.Bus.Get())
b.currentNodeAtomic.Store(nb)

@ -117,6 +117,8 @@ func newNodeBackend(ctx context.Context, logf logger.Logf, bus *eventbus.Bus) *n
eventClient: bus.Client("ipnlocal.nodeBackend"),
readyCh: make(chan struct{}),
}
syncs.RegisterMutex(&nb.mu, "ipnlocal.nodeBackend.mu")
// Default filter blocks everything and logs nothing.
noneFilter := filter.NewAllowNone(logger.Discard, &netipx.IPSet{})
nb.filterAtomic.Store(noneFilter)

@ -28,9 +28,11 @@ type Buffer interface {
}
func NewMemoryBuffer(numEntries int) Buffer {
return &memBuffer{
mb := &memBuffer{
pending: make(chan qentry, numEntries),
}
syncs.RegisterMutex(&mb.dropMu, "logtail.memBuffer.dropMu")
return mb
}
type memBuffer struct {

@ -15,7 +15,6 @@ import (
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
@ -65,7 +64,7 @@ type Manager struct {
knobs *controlknobs.Knobs // or nil
goos string // if empty, gets set to runtime.GOOS
mu sync.Mutex // guards following
mu syncs.Mutex // guards following
config *Config // Tracks the last viable DNS configuration set by Set. nil on failures other than compilation failures or if set has never been called.
}
@ -95,6 +94,7 @@ func NewManager(logf logger.Logf, oscfg OSConfigurator, health *health.Tracker,
knobs: knobs,
goos: goos,
}
syncs.RegisterMutex(&m.mu, "dns.Manager.mu")
m.ctx, m.ctxCancel = context.WithCancel(context.Background())
m.logf("using %T", m.os)

@ -267,6 +267,7 @@ func newForwarder(logf logger.Logf, netMon *netmon.Monitor, linkSel ForwardLinkS
controlKnobs: knobs,
verboseFwd: verboseDNSForward(),
}
syncs.RegisterMutex(&f.mu, "resolver.forwarder.mu")
f.ctx, f.ctxCancel = context.WithCancel(context.Background())
return f
}

@ -249,6 +249,7 @@ func New(logf logger.Logf, linkSel ForwardLinkSelector, dialer *tsdial.Dialer, h
dialer: dialer,
health: health,
}
syncs.RegisterMutex(&r.mu, "resolver.Resolver.mu")
r.forwarder = newForwarder(r.logf, netMon, linkSel, dialer, health, knobs)
return r
}

@ -98,10 +98,16 @@ type Resolver struct {
sf singleflight.Group[string, ipRes]
registerMutexOnce sync.Once
mu syncs.Mutex
ipCache map[string]ipCacheEntry
}
func (r *Resolver) registerMutex() {
syncs.RegisterMutex(&r.mu, "dnscache.Resolver.mu")
}
// ipRes is the type used by the Resolver.sf singleflight group.
type ipRes struct {
ip, ip6 netip.Addr
@ -193,6 +199,8 @@ func SetDebugLoggingEnabled(v bool) {
// If err is nil, ip will be non-nil. The v6 address may be nil even
// with a nil error.
func (r *Resolver) LookupIP(ctx context.Context, host string) (ip, v6 netip.Addr, allIPs []netip.Addr, err error) {
r.registerMutexOnce.Do(r.registerMutex)
if r.SingleHostStaticResult != nil {
if r.SingleHost != host {
return zaddr, zaddr, nil, fmt.Errorf("dnscache: unexpected hostname %q doesn't match expected %q", host, r.SingleHost)
@ -373,11 +381,13 @@ func (r *Resolver) addIPCache(host string, ip, ip6 netip.Addr, allIPs []netip.Ad
// Dialer returns a wrapped DialContext func that uses the provided dnsCache.
func Dialer(fwd netx.DialFunc, dnsCache *Resolver) netx.DialFunc {
dnsCache.registerMutexOnce.Do(dnsCache.registerMutex)
d := &dialer{
fwd: fwd,
dnsCache: dnsCache,
pastConnect: map[netip.Addr]time.Time{},
}
syncs.RegisterMutex(&d.mu, "dnscache.dialer.mu")
return d.DialContext
}
@ -386,11 +396,12 @@ type dialer struct {
fwd netx.DialFunc
dnsCache *Resolver
mu sync.Mutex
mu syncs.Mutex
pastConnect map[netip.Addr]time.Time
}
func (d *dialer) DialContext(ctx context.Context, network, address string) (retConn net.Conn, ret error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
// Bogus. But just let the real dialer return an error rather than
@ -404,6 +415,7 @@ func (d *dialer) DialContext(ctx context.Context, network, address string) (retC
host: host,
port: port,
}
syncs.RegisterMutex(&dc.mu, "dnscache.dialCall.mu")
defer func() {
// On failure, consider that our DNS might be wrong and ask the DNS fallback mechanism for
// some other IPs to try.

@ -235,6 +235,8 @@ type Client struct {
testEnoughRegions int
testCaptivePortalDelay time.Duration
registerMutexOnce sync.Once
mu syncs.Mutex // guards following
nextFull bool // do a full region scan, even if last != nil
prev map[time.Time]*Report // some previous reports
@ -244,6 +246,10 @@ type Client struct {
resolver *dnscache.Resolver // only set if UseDNSCache is true
}
func (c *Client) registerMutex() {
syncs.RegisterMutex(&c.mu, "netcheck.Client.mu")
}
func (c *Client) enoughRegions() int {
if c.testEnoughRegions > 0 {
return c.testEnoughRegions
@ -281,6 +287,7 @@ func (c *Client) vlogf(format string, a ...any) {
// MakeNextReportFull forces the next GetReport call to be a full
// (non-incremental) probe of all DERP regions.
func (c *Client) MakeNextReportFull() {
c.registerMutexOnce.Do(c.registerMutex)
c.mu.Lock()
defer c.mu.Unlock()
c.nextFull = true
@ -291,6 +298,8 @@ func (c *Client) MakeNextReportFull() {
// the loop started by Standalone, in normal operation in tailscaled incoming
// STUN replies are routed to this method.
func (c *Client) ReceiveSTUNPacket(pkt []byte, src netip.AddrPort) {
c.registerMutexOnce.Do(c.registerMutex)
c.vlogf("received STUN packet from %s", src)
if src.Addr().Is4() {
@ -782,6 +791,7 @@ func (o *GetReportOpts) getLastDERPActivity(region int) time.Time {
}
func (c *Client) SetForcePreferredDERP(region int) {
c.registerMutexOnce.Do(c.registerMutex)
c.mu.Lock()
defer c.mu.Unlock()
c.ForcePreferredDERP = region
@ -797,6 +807,7 @@ var hookStartCaptivePortalDetection feature.Hook[func(ctx context.Context, rs *r
//
// It may not be called concurrently with itself.
func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetReportOpts) (_ *Report, reterr error) {
c.registerMutexOnce.Do(c.registerMutex)
onlySTUN := false
if opts != nil && opts.OnlySTUN {
if opts.OnlyTCP443 {
@ -839,6 +850,7 @@ func (c *Client) GetReport(ctx context.Context, dm *tailcfg.DERPMap, opts *GetRe
inFlight: map[stun.TxID]func(netip.AddrPort){},
stopProbeCh: make(chan struct{}, 1),
}
syncs.RegisterMutex(&rs.mu, "netcheck.reportState.mu")
c.curState = rs
last := c.last

@ -30,6 +30,11 @@ var ifNames struct {
m map[int]string // ifindex => name
}
var _ = func() bool {
syncs.RegisterMutex(&ifNames.Mutex, "netmon.ifNames.Mutex")
return true
}()
func init() {
interfaceDebugExtras = interfaceDebugExtrasDarwin
}

@ -125,6 +125,7 @@ func New(bus *eventbus.Bus, logf logger.Logf) (*Monitor, error) {
stop: make(chan struct{}),
lastWall: wallTime(),
}
syncs.RegisterMutex(&m.mu, "netmon.Monitor.mu")
m.changed = eventbus.Publish[ChangeDelta](m.b)
st, err := m.interfaceStateUncached()
if err != nil {

@ -81,13 +81,15 @@ func New(ctx context.Context, logf logger.Logf, lp ListenPacketer) *Pinger {
panic("net/ping: New:" + err.Error())
}
return &Pinger{
p := &Pinger{
lp: lp,
Logf: logf,
timeNow: time.Now,
id: binary.LittleEndian.Uint16(id[:]),
pings: make(map[uint16]outstanding),
}
syncs.RegisterMutex(&p.mu, "ping.Pinger.mu")
return p
}
func (p *Pinger) mkconn(ctx context.Context, typ, addr string) (net.PacketConn, error) {

@ -266,6 +266,7 @@ func NewClient(c Config) *Client {
netMon: c.NetMon,
onChange: c.OnChange,
}
syncs.RegisterMutex(&ret.mu, "portmapper.Client.mu")
if buildfeatures.HasPortMapper {
// TODO(bradfitz): move this to method on netMon
ret.ipAndGateway = netmon.LikelyHomeRouterIP

@ -75,6 +75,8 @@ type Dialer struct {
// If nil, it's not used.
NetstackDialUDP func(context.Context, netip.AddrPort) (net.Conn, error)
registerMutexOnce sync.Once
peerClientOnce sync.Once
peerClient *http.Client
@ -142,6 +144,7 @@ func (d *Dialer) SetExitDNSDoH(doh string) {
if !buildfeatures.HasUseExitNode {
return
}
d.registerMutexOnce.Do(d.registerMutex)
d.mu.Lock()
defer d.mu.Unlock()
if d.exitDNSDoHBase == doh {
@ -193,9 +196,15 @@ func (d *Dialer) Close() error {
return nil
}
func (d *Dialer) registerMutex() {
syncs.RegisterMutex(&d.mu, "tsdial.Dialer.mu")
}
// SetNetMon sets d's network monitor to netMon.
// It is a no-op to call SetNetMon with the same netMon as the current one.
func (d *Dialer) SetNetMon(netMon *netmon.Monitor) {
d.registerMutexOnce.Do(d.registerMutex)
d.mu.Lock()
defer d.mu.Unlock()
if d.netMon == netMon {
@ -220,12 +229,14 @@ func (d *Dialer) SetNetMon(netMon *netmon.Monitor) {
// NetMon returns the Dialer's network monitor.
// It returns nil if SetNetMon has not been called.
func (d *Dialer) NetMon() *netmon.Monitor {
d.registerMutexOnce.Do(d.registerMutex)
d.mu.Lock()
defer d.mu.Unlock()
return d.netMon
}
func (d *Dialer) SetBus(bus *eventbus.Bus) {
d.registerMutexOnce.Do(d.registerMutex)
d.mu.Lock()
defer d.mu.Unlock()
if d.bus == bus {

@ -7,6 +7,10 @@ package syncs
import "sync"
// MutexDebugging indicates whether the "ts_mutex_debug" build tag is set
// and mutex debugging is enabled.
const MutexDebugging = false
// Mutex is an alias for sync.Mutex.
//
// It's only not a sync.Mutex when built with the ts_mutex_debug build tag.
@ -20,4 +24,14 @@ type RWMutex = sync.RWMutex
// RequiresMutex declares the caller assumes it has the given
// mutex held. In non-debug builds, it's a no-op and compiles to
// nothing.
func RequiresMutex(mu *sync.Mutex) {}
func RequiresMutex(mu *Mutex) {}
func RegisterMutex(mu *Mutex, name string) {}
// ForkJoinGo is like go fn() but indicates that the goroutine
// is part of a fork-join parallelism pattern.
//
// This compiles to just "go fn()" in non-debug builds.
func ForkJoinGo(fn func()) {
go fn()
}

@ -5,7 +5,20 @@
package syncs
import "sync"
import (
"bytes"
"fmt"
"log"
"runtime"
"sync"
"unsafe"
"go4.org/mem"
)
// MutexDebugging indicates whether the "ts_mutex_debug" build tag is set
// and mutex debugging is enabled.
const MutexDebugging = true
type Mutex struct {
sync.Mutex
@ -15,8 +28,211 @@ type RWMutex struct {
sync.RWMutex
}
func RequiresMutex(mu *sync.Mutex) {
func RequiresMutex(mu *Mutex) {
// TODO: check
}
// TODO(bradfitz): actually track stuff when in debug mode.
var bufPool = &sync.Pool{
New: func() any {
b := make([]byte, 16<<10)
return &b
},
}
func (m *Mutex) Lock() {
defer m.Mutex.Lock()
gid := curGoroutineID()
up := uintptr((unsafe.Pointer)(m))
bufp := bufPool.Get().(*[]byte)
defer bufPool.Put(bufp)
stack := (*bufp)[:runtime.Stack(*bufp, false)]
trackMu.Lock()
defer trackMu.Unlock()
gid = walkToParent(gid)
name, ok := mutexName[up]
if !ok {
name = "unnamed"
log.Printf("XXX unregistered Mutex.Lock %p called from:\n%s", m, stack)
}
switch name {
case "ipnlocal.LocalBackend.mu", "wgengine.userspaceEngine.wgLock", "ipnlocal.nodeBackend.mu":
if bytes.Contains(stack, []byte("wireguard-go/device.(*Device).RoutineReceiveIncoming")) {
log.Printf("XXX mutex Lock from wireguard land: %s, %s", name, stack)
}
}
gi, ok := goroutines[gid]
if !ok {
gi = &goroutineInfo{}
goroutines[gid] = gi
}
gi.holding = append(gi.holding, &heldLock{
mutexAddr: up,
name: name,
})
if len(gi.holding) > 1 {
names := make([]string, 0, len(gi.holding))
for i, hl := range gi.holding {
names = append(names, hl.name)
if i == 0 {
continue
}
lo := lockOrder{
first: gi.holding[i-1].name,
second: hl.name,
}
if lockOrders[lo.reverse()] {
log.Printf("mutex: potential deadlock detected: lock order violation: %q then %q (saw reverse order before); goroutine %d stack:\n%s", lo.first, lo.second, gid, stack)
} else {
if _, ok := lockOrders[lo]; !ok {
log.Printf("XXX learned new lock order: %q then %q", lo.first, lo.second)
lockOrders[lo] = true
}
}
}
log.Printf("XXX goroutine %v holding %q", gid, names)
}
}
func (m *Mutex) Unlock() {
defer m.Mutex.Unlock()
up := uintptr((unsafe.Pointer)(m))
gid := curGoroutineID()
trackMu.Lock()
defer trackMu.Unlock()
gid = walkToParent(gid)
name, ok := mutexName[up]
if !ok {
name = "unnamed"
}
gi, ok := goroutines[gid]
if !ok || len(gi.holding) == 0 {
log.Printf("mutex: unlock of %p (%s) by goroutine %d with no held locks", m, name, gid)
return
}
last := gi.holding[len(gi.holding)-1]
if last.mutexAddr != up {
log.Printf("mutex: unlock of %p (%s) by goroutine %d, but last held lock is %p (%s)", m, name, gid, last.mutexAddr, last.name)
return
}
gi.holding[len(gi.holding)-1] = nil
gi.holding = gi.holding[:len(gi.holding)-1]
if len(gi.holding) == 0 {
delete(goroutines, gid)
}
}
var (
trackMu sync.Mutex
mutexName = make(map[uintptr]string)
goroutines = make(map[uint64]*goroutineInfo)
parentGID = make(map[uint64]uint64) // child goroutine ID -> parent (for ForkJoinGo)
lockOrders = make(map[lockOrder]bool) // observed lock orderings
)
type lockOrder struct {
first string
second string
}
func (lo lockOrder) reverse() lockOrder {
return lockOrder{first: lo.second, second: lo.first}
}
type goroutineInfo struct {
holding []*heldLock
}
type heldLock struct {
mutexAddr uintptr
name string
// TODO: stack? [16]uintptr?
}
// RegisterMutex registers the given mutex with the given name for
// debugging purposes.
func RegisterMutex(mu *Mutex, name string) {
trackMu.Lock()
defer trackMu.Unlock()
up := uintptr((unsafe.Pointer)(mu))
mutexName[up] = name
runtime.AddCleanup(mu, func(up uintptr) {
trackMu.Lock()
defer trackMu.Unlock()
delete(mutexName, up)
}, up)
}
var littleBuf = sync.Pool{
New: func() interface{} {
buf := make([]byte, 64)
return &buf
},
}
var goroutineSpace = []byte("goroutine ")
func curGoroutineID() uint64 {
bp := littleBuf.Get().(*[]byte)
defer littleBuf.Put(bp)
b := *bp
b = b[:runtime.Stack(b, false)]
// Parse the 4707 out of "goroutine 4707 ["
b = bytes.TrimPrefix(b, goroutineSpace)
i := bytes.IndexByte(b, ' ')
if i < 0 {
panic(fmt.Sprintf("No space found in %q", b))
}
b = b[:i]
n, err := mem.ParseUint(mem.B(b), 10, 64)
if err != nil {
panic(fmt.Sprintf("Failed to parse goroutine ID out of %q: %v", b, err))
}
return n
}
func trackForkJoinPair(parent, child uint64, add bool) {
trackMu.Lock()
defer trackMu.Unlock()
if add {
parentGID[child] = parent
} else {
delete(parentGID, child)
}
}
func walkToParent(gid uint64) uint64 {
for {
p, ok := parentGID[gid]
if !ok {
return gid
}
gid = p
}
}
// ForkJoinGo is like go fn() but indicates that the goroutine
// is part of a fork-join parallelism pattern.
//
// This compiles to just "go fn()" in non-debug builds.
func ForkJoinGo(fn func()) {
parentGID := curGoroutineID()
go func() {
childGID := curGoroutineID()
trackForkJoinPair(parentGID, childGID, true)
defer trackForkJoinPair(parentGID, childGID, false)
fn()
}()
}

@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus {
clients: set.Set[*Client]{},
logf: opts.logger(),
}
syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu")
syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu")
ret.router = runWorker(ret.pump)
return ret
}
@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client {
bus: b,
pub: set.Set[publisher]{},
}
syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu")
syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu")
b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.clients.Add(ret)

@ -11,10 +11,10 @@ import (
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"tailscale.com/syncs"
"tailscale.com/types/logger"
)
@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
syncs.Mutex
sync.Mutex
fns []hookFn[T]
}

@ -5,6 +5,8 @@ package eventbus
import (
"reflect"
"tailscale.com/syncs"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
@ -21,7 +23,9 @@ type Publisher[T any] struct {
}
func newPublisher[T any](c *Client) *Publisher[T] {
return &Publisher[T]{client: c}
p := &Publisher[T]{client: c}
syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu")
return p
}
// Close closes the publisher.

@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState {
snapshot: make(chan chan []DeliveredEvent),
outputs: map[reflect.Type]subscriber{},
}
syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu")
ret.dispatcher = runWorker(ret.pump)
return ret
}
@ -194,18 +195,21 @@ type Subscriber[T any] struct {
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
return &Subscriber[T]{
s := &Subscriber[T]{
read: make(chan T),
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
return s
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
}
syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu")
ret.unregister = attach(ret.monitor)
return ret
}
@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct {
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
return &SubscriberFunc[T]{
s := &SubscriberFunc[T]{
read: f,
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
return s
}
// Close closes the SubscriberFunc, indicating the caller no longer wishes to

@ -7,11 +7,14 @@ package execqueue
import (
"context"
"errors"
"sync"
"tailscale.com/syncs"
)
type ExecQueue struct {
regMutexOnce sync.Once
mu syncs.Mutex
ctx context.Context // context.Background + closed on Shutdown
cancel context.CancelFunc // closes ctx
@ -21,7 +24,13 @@ type ExecQueue struct {
queue []func()
}
func (q *ExecQueue) registerMutex() {
syncs.RegisterMutex(&q.mu, "execqueue.ExecQueue.mu")
}
func (q *ExecQueue) Add(f func()) {
q.regMutexOnce.Do(q.registerMutex)
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
@ -39,6 +48,8 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
q.regMutexOnce.Do(q.registerMutex)
q.mu.Lock()
q.initCtxLocked()
shutdownCtx := q.ctx
@ -80,6 +91,8 @@ func (q *ExecQueue) run(f func()) {
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
q.regMutexOnce.Do(q.registerMutex)
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
@ -98,6 +111,8 @@ var errExecQueueShutdown = errors.New("execqueue shut down")
// Wait waits for the queue to be empty or shut down.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.regMutexOnce.Do(q.registerMutex)
q.mu.Lock()
q.initCtxLocked()
waitCh := q.doneWaiter

@ -4,9 +4,9 @@
package goroutines
import (
"sync"
"sync/atomic"
"tailscale.com/syncs"
"tailscale.com/util/set"
)
@ -15,7 +15,7 @@ type Tracker struct {
started atomic.Int64 // counter
running atomic.Int64 // gauge
mu syncs.Mutex
mu sync.Mutex
onDone set.HandleSet[func()]
}

@ -8,9 +8,11 @@ import "tailscale.com/syncs"
// New creates a new [RingLog] containing at most max items.
func New[T any](max int) *RingLog[T] {
return &RingLog[T]{
rl := &RingLog[T]{
max: max,
}
syncs.RegisterMutex(&rl.mu, "ringlog.RingLog.mu")
return rl
}
// RingLog is a concurrency-safe fixed size log window containing entries of [T].

@ -9,7 +9,6 @@ import (
"sync"
"time"
"tailscale.com/syncs"
"tailscale.com/util/set"
"tailscale.com/util/syspolicy/internal/loggerx"
"tailscale.com/util/syspolicy/pkey"
@ -71,7 +70,7 @@ func (c PolicyChange) HasChangedAnyOf(keys ...pkey.Key) bool {
// policyChangeCallbacks are the callbacks to invoke when the effective policy changes.
// It is safe for concurrent use.
type policyChangeCallbacks struct {
mu syncs.Mutex
mu sync.Mutex
cbs set.HandleSet[PolicyChangeCallback]
}

@ -96,6 +96,8 @@ func newPolicy(scope setting.PolicyScope, sources ...*source.Source) (_ *Policy,
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
syncs.RegisterMutex(&p.mu, "syspolicy/rsop.Policy.mu")
if _, err := p.reloadNow(false); err != nil {
p.Close()
return nil, err

@ -32,6 +32,11 @@ var (
effectivePolicyLRU [setting.NumScopes]syncs.AtomicValue[*Policy]
)
var _ = func() bool {
syncs.RegisterMutex(&policyMu, "syspolicy/rsop.policyMu")
return true
}()
// PolicyFor returns the [Policy] for the specified scope,
// creating it from the registered [source.Store]s if it doesn't already exist.
func PolicyFor(scope setting.PolicyScope) (*Policy, error) {

@ -220,6 +220,11 @@ var (
definitionsUsed bool
)
var _ = func() bool {
syncs.RegisterMutex(&definitionsMu, "syspolicy/setting.definitionsMu")
return true
}()
// Register registers a policy setting with the specified key, scope, value type,
// and an optional list of supported platforms. All policy settings must be
// registered before any of them can be used. Register panics if called after

@ -599,6 +599,11 @@ func newConn(logf logger.Logf) *Conn {
discoInfo: make(map[key.DiscoPublic]*discoInfo),
cloudInfo: newCloudInfo(logf),
}
syncs.RegisterMutex(&c.mu, "magicsock.Conn.mu")
syncs.RegisterMutex(&c.pconn4.mu, "magicsock.Conn.pconn4.mu")
syncs.RegisterMutex(&c.pconn6.mu, "magicsock.Conn.pconn6.mu")
syncs.RegisterMutex(&c.endpointTracker.mu, "magicsock.Conn.endpointTracker.mu")
c.discoAtomic.Set(discoPrivate)
c.bind = &connBind{Conn: c, closed: true}
c.receiveBatchPool = sync.Pool{New: func() any {
@ -3145,6 +3150,7 @@ func (c *Conn) updateNodes(update NodeViewsUpdate) (peersChanged bool) {
heartbeatDisabled: flags.heartbeatDisabled,
isWireguardOnly: n.IsWireGuardOnly(),
}
syncs.RegisterMutex(&ep.mu, "magicsock.endpoint.mu")
switch runtime.GOOS {
case "ios", "android":
// Omit, to save memory. Prior to 2024-03-20 we used to limit it to

@ -80,6 +80,10 @@ type Logger struct {
routePrefixes []netip.Prefix
}
func (nl *Logger) RegisterMutex() {
syncs.RegisterMutex(&nl.mu, "netlog.Logger.mu")
}
// Running reports whether the logger is running.
func (nl *Logger) Running() bool {
nl.mu.Lock()

@ -12,3 +12,4 @@ func (*Logger) Running() bool { return false }
func (*Logger) Shutdown(any) error { return nil }
func (*Logger) ReconfigNetworkMap(any) {}
func (*Logger) ReconfigRoutes(any) {}
func (*Logger) RegisterMutex() {}

@ -17,7 +17,6 @@ import (
"runtime"
"slices"
"strings"
"sync"
"time"
"github.com/tailscale/wireguard-go/device"
@ -130,7 +129,7 @@ type userspaceEngine struct {
// is being routed over Tailscale.
isDNSIPOverTailscale syncs.AtomicValue[func(netip.Addr) bool]
wgLock sync.Mutex // serializes all wgdev operations; see lock order comment below
wgLock syncs.Mutex // serializes all wgdev operations; see lock order comment below
lastCfgFull wgcfg.Config
lastNMinPeers int
lastRouter *router.Config
@ -145,7 +144,7 @@ type userspaceEngine struct {
lastStatusPollTime mono.Time // last time we polled the engine status
reconfigureVPN func() error // or nil
mu sync.Mutex // guards following; see lock order comment below
mu syncs.Mutex // guards following; see lock order comment below
netMap *netmap.NetworkMap // or nil
closing bool // Close was called (even if we're still closing)
statusCallback StatusCallback
@ -361,6 +360,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
reconfigureVPN: conf.ReconfigureVPN,
health: conf.HealthTracker,
}
syncs.RegisterMutex(&e.mu, "wgengine.userspaceEngine.mu")
syncs.RegisterMutex(&e.wgLock, "wgengine.userspaceEngine.wgLock")
e.networkLogger.RegisterMutex()
if e.birdClient != nil {
// Disable the protocol at start time.

@ -19,6 +19,7 @@ import (
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns"
"tailscale.com/net/packet"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/netmap"
@ -81,9 +82,13 @@ func (e *watchdogEngine) watchdogErr(name string, fn func() error) error {
}()
errCh := make(chan error)
go func() {
errCh <- fn()
}()
if syncs.MutexDebugging {
syncs.ForkJoinGo(func() { errCh <- fn() })
} else {
// Don't use ForkJoinGo to avoid the loss of "created by" in
// stack traces.
go func() { errCh <- fn() }()
}
t := time.NewTimer(e.maxWait)
select {
case err := <-errCh:

Loading…
Cancel
Save