diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 1e0cd8abf..4ebefcab8 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "regexp" + "sync" "testing" "testing/synctest" "time" @@ -546,6 +547,75 @@ func TestRegression(t *testing.T) { }) } +const ( + maxQueuedItems = 16 // same as in queue.go + totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides +) + +func TestPublishWithMutex(t *testing.T) { + t.Run("FewEvents", func(t *testing.T) { + // As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine. + testPublishWithMutex(t, totalMaxQueuedItems) + }) + t.Run("ManyEvents", func(t *testing.T) { + // As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock. + t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973") + + const N = 3 // N larger than one increases the chance of deadlock. + testPublishWithMutex(t, totalMaxQueuedItems+N) + }) +} + +// testPublishWithMutex publishes the specified number of events, +// acquiring and releasing a mutex around each publish and each +// subscriber event receive. +// +// The test fails if it loses any events or times out due to a deadlock. +// Unfortunately, a goroutine waiting on a mutex held by a durably blocked +// goroutine is not itself considered durably blocked, so [synctest] cannot +// detect this deadlock on its own. +func testPublishWithMutex(t *testing.T, n int) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestClient") + + evts := make([]any, n) + for i := range evts { + evts[i] = EventA{Counter: i} + } + exp := expectEvents(t, evts...) + + var mu sync.Mutex + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + // As of 2025-11-20, this can deadlock if n is large enough + // and event queues fill up. + mu.Lock() + mu.Unlock() + + // Mark event as received, so we can check for lost events. + // Not required for the deadlock to occur. + exp.Got(e) + }) + + p := eventbus.Publish[EventA](c) + go func() { + for i := range n { + mu.Lock() + p.Publish(EventA{Counter: i}) + mu.Unlock() + } + }() + + synctest.Wait() + + if !exp.Empty() { + t.Errorf("unexpected extra events: %+v", exp.want) + } + }) +} + type queueChecker struct { t *testing.T want []any